//种子数 staticfinallong[] SEED = { // A mixture of seeds from FNV-1a, CityHash, and Murmur3 0xc3a5c85c97cb3127L, 0xb492b66fbe98f273L, 0x9ae16a3b2f90404fL, 0xcbf29ce484222325L}; staticfinallong RESET_MASK = 0x7777777777777777L; staticfinallong ONE_MASK = 0x1111111111111111L;
int sampleSize; //为了快速根据hash值得到table的index值的掩码 //table的长度size一般为2的n次方,而tableMask为size-1,这样就可以通过&操作来模拟取余操作,速度快很多,老司机都知道 int tableMask; //存储数据的一维long数组 long[] table; int size;
/** * Increments the popularity of the element if it does not exceed the maximum (15). The popularity * of all elements will be periodically down sampled when the observed events exceeds a threshold. * This process provides a frequency aging to allow expired long term entries to fade away. * * @param e the element to add */ publicvoidincrement(@NonNull E e){ if (isNotInitialized()) { return; }
//根据key的hashCode通过一个哈希函数得到一个hash值 //本来就是hashCode了,为什么还要再做一次hash?怕原来的hashCode不够均匀分散,再打散一下。 int hash = spread(e.hashCode()); //这句光看有点难理解 //就如我刚才说的,Caffeine把一个long的64bit划分成16个等分,每一等分4个bit。 //这个start就是用来定位到是哪一个等分的,用hash值低两位作为随机数,再左移2位,得到一个小于16的值 int start = (hash & 3) << 2;
//indexOf方法的意思就是,根据hash值和不同种子得到table的下标index //这里通过四个不同的种子,得到四个不同的下标index int index0 = indexOf(hash, 0); int index1 = indexOf(hash, 1); int index2 = indexOf(hash, 2); int index3 = indexOf(hash, 3);
/** * Increments the specified counter by 1 if it is not already at the maximum value (15). * * @param i the table index (16 counters) * @param j the counter to increment * @return if incremented */ booleanincrementAt(int i, int j){ //这个j表示16个等分的下标,那么offset就是相当于在64位中的下标(这个自己想想) int offset = j << 2; //上面提到Caffeine把频率统计最大定为15,即0xfL //mask就是在64位中的掩码,即1111后面跟很多个0 long mask = (0xfL << offset); //如果&的结果不等于15,那么就追加1。等于15就不会再加了 if ((table[i] & mask) != mask) { table[i] += (1L << offset); returntrue; } returnfalse; }
/** * Returns the table index for the counter at the specified depth. * * @param item the element's hash * @param i the counter depth * @return the table index */ intindexOf(int item, int i){ long hash = SEED[i] * item; hash += hash >>> 32; return ((int) hash) & tableMask; }
/** * Applies a supplemental hash function to a given hashCode, which defends against poor quality * hash functions. */ intspread(int x){ x = ((x >>> 16) ^ x) * 0x45d9f3b; x = ((x >>> 16) ^ x) * 0x45d9f3b; return (x >>> 16) ^ x; }
知道了追加方法,那么读取方法frequency就很容易理解了。
/** * Returns the estimated number of occurrences of an element, up to the maximum (15). * * @param e the element to count occurrences of * @return the estimated number of occurrences of the element; possibly zero but never negative */ @NonNegative publicintfrequency(@NonNull E e){ if (isNotInitialized()) { return0; }
//得到hash值,跟上面一样 int hash = spread(e.hashCode()); //得到等分的下标,跟上面一样 int start = (hash & 3) << 2; int frequency = Integer.MAX_VALUE; //循环四次,分别获取在table数组中不同的下标位置 for (int i = 0; i < 4; i++) { int index = indexOf(hash, i); //这个操作就不多说了,其实跟上面incrementAt是一样的,定位到table[index] + 等分的位置,再根据mask取出计数值 int count = (int) ((table[index] >>> ((start + i) << 2)) & 0xfL); //取四个中的较小值 frequency = Math.min(frequency, count); } return frequency; }
/** * Performs the pending maintenance work and sets the state flags during processing to avoid * excess scheduling attempts. The read buffer, write buffer, and reference queues are * drained, followed by expiration, and size-based eviction. * * @param task an additional pending task to run, or {@code null} if not present */ @GuardedBy("evictionLock") voidmaintenance(@Nullable Runnable task){ lazySetDrainStatus(PROCESSING_TO_IDLE);
try { drainReadBuffer();
drainWriteBuffer(); if (task != null) { task.run(); }
//最大的个数限制 long maximum; //当前的个数 long weightedSize; //window区的最大限制 long windowMaximum; //window区当前的个数 long windowWeightedSize; //protected区的最大限制 long mainProtectedMaximum; //protected区当前的个数 long mainProtectedWeightedSize; //下一次需要调整的大小(还需要进一步计算) double stepSize; //window区需要调整的大小 long adjustment; //命中计数 int hitsInSample; //不命中的计数 int missesInSample; //上一次的缓存命中率 double previousSampleHitRate;
final FrequencySketch<K> sketch; //window区的LRU queue(FIFO) final AccessOrderDeque<Node<K, V>> accessOrderWindowDeque; //probation区的LRU queue(FIFO) final AccessOrderDeque<Node<K, V>> accessOrderProbationDeque; //protected区的LRU queue(FIFO) final AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque;
以及默认比例设置(意思看注释)
/** The initial percent of the maximum weighted capacity dedicated to the main space. */ staticfinaldouble PERCENT_MAIN = 0.99d; /** The percent of the maximum weighted capacity dedicated to the main's protected space. */ staticfinaldouble PERCENT_MAIN_PROTECTED = 0.80d; /** The difference in hit rates that restarts the climber. */ staticfinaldouble HILL_CLIMBER_RESTART_THRESHOLD = 0.05d; /** The percent of the total size to adapt the window by. */ staticfinaldouble HILL_CLIMBER_STEP_PERCENT = 0.0625d; /** The rate to decrease the step size to adapt by. */ staticfinaldouble HILL_CLIMBER_STEP_DECAY_RATE = 0.98d; /** The maximum number of entries that can be transfered between queues. */
重点来了,evictEntries和climb方法:
/** Evicts entries if the cache exceeds the maximum. */ @GuardedBy("evictionLock") voidevictEntries(){ if (!evicts()) { return; } //淘汰window区的记录 int candidates = evictFromWindow(); //淘汰Main区的记录 evictFromMain(candidates); }
/** * Evicts entries from the window space into the main space while the window size exceeds a * maximum. * * @return the number of candidate entries evicted from the window space */ //根据W-TinyLFU,新的数据都会无条件的加到admission window //但是window是有大小限制,所以要“定期”做一下“维护” @GuardedBy("evictionLock") intevictFromWindow(){ int candidates = 0; //查看window queue的头部节点 Node<K, V> node = accessOrderWindowDeque().peek(); //如果window区超过了最大的限制,那么就要把“多出来”的记录做处理 while (windowWeightedSize() > windowMaximum()) { // The pending operations will adjust the size to reflect the correct weight if (node == null) { break; } //下一个节点 Node<K, V> next = node.getNextInAccessOrder(); if (node.getWeight() != 0) { //把node定位在probation区 node.makeMainProbation(); //从window区去掉 accessOrderWindowDeque().remove(node); //加入到probation queue,相当于把节点移动到probation区(晋升了) accessOrderProbationDeque().add(node); candidates++; //因为移除了一个节点,所以需要调整window的size setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight()); } //处理下一个节点 node = next; }
return candidates; }
evictFromMain方法:
/** * Evicts entries from the main space if the cache exceeds the maximum capacity. The main space * determines whether admitting an entry (coming from the window space) is preferable to retaining * the eviction policy's victim. This is decision is made using a frequency filter so that the * least frequently used entry is removed. * * The window space candidates were previously placed in the MRU position and the eviction * policy's victim is at the LRU position. The two ends of the queue are evaluated while an * eviction is required. The number of remaining candidates is provided and decremented on * eviction, so that when there are no more candidates the victim is evicted. * * @param candidates the number of candidate entries evicted from the window space */ //根据W-TinyLFU,从window晋升过来的要跟probation区的进行“PK”,胜者才能留下 @GuardedBy("evictionLock") voidevictFromMain(int candidates){ int victimQueue = PROBATION; //victim是probation queue的头部 Node<K, V> victim = accessOrderProbationDeque().peekFirst(); //candidate是probation queue的尾部,也就是刚从window晋升来的 Node<K, V> candidate = accessOrderProbationDeque().peekLast(); //当cache不够容量时才做处理 while (weightedSize() > maximum()) { // Stop trying to evict candidates and always prefer the victim if (candidates == 0) { candidate = null; }
/** * Determines if the candidate should be accepted into the main space, as determined by its * frequency relative to the victim. A small amount of randomness is used to protect against hash * collision attacks, where the victim's frequency is artificially raised so that no new entries * are admitted. * * @param candidateKey the key for the entry being proposed for long term retention * @param victimKey the key for the entry chosen by the eviction policy for replacement * @return if the candidate should be admitted and the victim ejected */ @GuardedBy("evictionLock") booleanadmit(K candidateKey, K victimKey){ //分别获取victim和candidate的统计频率 //frequency这个方法的原理和实现上面已经解释了 int victimFreq = frequencySketch().frequency(victimKey); int candidateFreq = frequencySketch().frequency(candidateKey); //谁大谁赢 if (candidateFreq > victimFreq) { returntrue;
//如果相等,candidate小于5都当输了 } elseif (candidateFreq <= 5) { // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm // candidate reduces the number of random acceptances to minimize the impact on the hit rate. returnfalse; } //如果相等且candidate大于5,则随机淘汰一个 int random = ThreadLocalRandom.current().nextInt(); return ((random & 127) == 0); }
/** * Increases the size of the admission window by shrinking the portion allocated to the main * space. As the main space is partitioned into probation and protected regions (80% / 20%), for * simplicity only the protected is reduced. If the regions exceed their maximums, this may cause * protected items to be demoted to the probation region and probation items to be demoted to the * admission window. */
//增加window区的大小,这个方法比较简单,思路就像我上面说的 @GuardedBy("evictionLock") voidincreaseWindow(){ if (mainProtectedMaximum() == 0) { return; }
/** Decreases the size of the admission window and increases the main's protected region. */ //同上increaseWindow差不多,反操作 @GuardedBy("evictionLock") voiddecreaseWindow(){ if (windowMaximum() <= 1) { return; }
/** * Performs the post-processing work required after a read. * * @param node the entry in the page replacement policy * @param now the current time, in nanoseconds * @param recordHit if the hit count should be incremented */ voidafterRead(Node<K, V> node, long now, boolean recordHit){ if (recordHit) { statsCounter().recordHits(1); } //把记录加入到readBuffer //判断是否需要立即处理readBuffer //注意这里无论offer是否成功都可以走下去的,即允许写入readBuffer丢失,因为这个 boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL); if (shouldDrainBuffers(delayable)) { scheduleDrainBuffers(); } refreshIfNeeded(node, now); }
/** * Returns whether maintenance work is needed. * * @param delayable if draining the read buffer can be delayed */
//caffeine用了一组状态来定义和管理“维护”的过程 booleanshouldDrainBuffers(boolean delayable){ switch (drainStatus()) { case IDLE: return !delayable; case REQUIRED: returntrue; case PROCESSING_TO_IDLE: case PROCESSING_TO_REQUIRED: returnfalse; default: thrownew IllegalStateException(); } }
重点看BoundedBuffer
/** * A striped, non-blocking, bounded buffer. * * @author ben.manes@gmail.com (Ben Manes) * @param <E> the type of elements maintained by this buffer */ finalclassBoundedBuffer<E> extendsStripedBuffer<E>
/** * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64} * class, which is used by atomic counters. The approach was modified to lazily grow an array of * buffers in order to minimize memory usage for caches that are not heavily contended on. * * @author dl@cs.oswego.edu (Doug Lea) * @author ben.manes@gmail.com (Ben Manes) */
/** Number of CPUS. */ staticfinalint NCPU = Runtime.getRuntime().availableProcessors();
/** The bound on the table size. */ //table最大size staticfinalint MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);
/** The maximum number of attempts when trying to expand the table. */ //如果发生竞争时(CAS失败)的尝试次数 staticfinalint ATTEMPTS = 3;
/** Table of buffers. When non-null, size is a power of 2. */ //核心数据结构 transientvolatile Buffer<E> @Nullable[] table;
/** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */ transientvolatileint tableBusy;
/** CASes the tableBusy field from 0 to 1 to acquire lock. */ finalbooleancasTableBusy(){ return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1); }
/** * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of * packaging restrictions. */ staticfinalintgetProbe(){ return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); }
/** * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or * contention. See above for explanation. This method suffers the usual non-modularity problems of * optimistic retry code, relying on rechecked sets of reads. * * @param e the element to add * @param wasUncontended false if CAS failed before call */
//这个方法比较长,但思路还是相对清晰的。 @SuppressWarnings("PMD.ConfusingTernary") finalvoidexpandOrRetry(E e, boolean wasUncontended){ int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (int attempt = 0; attempt < ATTEMPTS; attempt++) { Buffer<E>[] buffers; Buffer<E> buffer; int n; if (((buffers = table) != null) && ((n = buffers.length) > 0)) { if ((buffer = buffers[(n - 1) & h]) == null) { if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer boolean created = false; try { // Recheck under lock Buffer<E>[] rs; int mask, j; if (((rs = table) != null) && ((mask = rs.length) > 0) && (rs[j = (mask - 1) & h] == null)) { rs[j] = create(e); created = true; } } finally { tableBusy = 0; } if (created) { break; } continue; // Slot is now non-empty } collide = false; } elseif (!wasUncontended) { // CAS already known to fail wasUncontended = true; // Continue after rehash } elseif (buffer.offer(e) != Buffer.FAILED) { break; } elseif (n >= MAXIMUM_TABLE_SIZE || table != buffers) { collide = false; // At max size or stale } elseif (!collide) { collide = true; } elseif (tableBusy == 0 && casTableBusy()) { try { if (table == buffers) { // Expand table unless stale table = Arrays.copyOf(buffers, n << 1); } } finally { tableBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } elseif ((tableBusy == 0) && (table == buffers) && casTableBusy()) { boolean init = false; try { // Initialize table if (table == buffers) { @SuppressWarnings({"unchecked", "rawtypes"}) Buffer<E>[] rs = new Buffer[1]; rs[0] = create(e); table = rs; init = true; } } finally { tableBusy = 0; } if (init) { break; } } } }
最后看看RingBuffer,注意RingBuffer是BoundedBuffer的内部类。
/** The maximum number of elements per buffer. */ staticfinalint BUFFER_SIZE = 16;
// Assume 4-byte references and 64-byte cache line (16 elements per line) //256长度,但是是以16为单位,所以最多存放16个元素 staticfinalint SPACED_SIZE = BUFFER_SIZE << 4; staticfinalint SPACED_MASK = SPACED_SIZE - 1; staticfinalint OFFSET = 16; //RingBuffer数组 final AtomicReferenceArray<E> buffer;
//插入方法 @Override publicintoffer(E e){ long head = readCounter; long tail = relaxedWriteCounter(); //用head和tail来限制个数 long size = (tail - head); if (size >= SPACED_SIZE) { return Buffer.FULL; } //tail追加16 if (casWriteCounter(tail, tail + OFFSET)) { //用tail“取余”得到下标 int index = (int) (tail & SPACED_MASK); //用unsafe.putOrderedObject设值 buffer.lazySet(index, e); return Buffer.SUCCESS; } //如果CAS失败则返回失败 return Buffer.FAILED; }
//用consumer来处理buffer的数据 @Override publicvoiddrainTo(Consumer<E> consumer){ long head = readCounter; long tail = relaxedWriteCounter(); //判断数据多少 long size = (tail - head); if (size == 0) { return; } do { int index = (int) (head & SPACED_MASK); E e = buffer.get(index); if (e == null) { // not published yet break; } buffer.lazySet(index, null); consumer.accept(e); //head也跟tail一样,每次递增16 head += OFFSET; } while (head != tail); lazySetReadCounter(head); }
注意,ring buffer 的 size(固定是 16 个)是不变的,变的是 head 和 tail 而已。
总的来说ReadBuffer有如下特点:
使用 Striped-RingBuffer来提升对 buffer 的读写
用 thread 的 hash 来避开热点 key 的竞争
允许写入的丢失
WriteBuffer
writeBuffer跟readBuffer不一样,主要体现在使用场景的不一样。本来缓存的一般场景是读多写少的,读的并发会更高,且 afterRead 显得没那么重要,允许延迟甚至丢失。写不一样,写afterWrite不允许丢失,且要求尽量马上执行。Caffeine 使用MPSC(Multiple Producer / Single Consumer)作为 buffer 数组,实现在MpscGrowableArrayQueue类,它是仿照JCTools的MpscGrowableArrayQueue来写的。
/** * Schedules a timer event for the node. * * @param node the entry in the cache */ publicvoidschedule(@NonNull Node<K, V> node){ Node<K, V> sentinel = findBucket(node.getVariableTime()); link(sentinel, node); }
/** * Determines the bucket that the timer event should be added to. * * @param time the time when the event fires * @return the sentinel at the head of the bucket */ Node<K, V> findBucket(long time){ long duration = time - nanos; int length = wheel.length - 1; for (int i = 0; i < length; i++) { if (duration < SPANS[i + 1]) { long ticks = (time >>> SHIFT[i]); int index = (int) (ticks & (wheel[i].length - 1)); return wheel[i][index]; } } return wheel[length][0]; }
/** Adds the entry at the tail of the bucket's list. */ voidlink(Node<K, V> sentinel, Node<K, V> node){ node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder()); node.setNextInVariableOrder(sentinel);
Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。
Sublime Text
Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。