redis-cluster和Codis关于slot迁移原理对比
概述
Codis是基于proxy架构的redis集群方案,如图1所示,即客户端的请求会先发送到proxy,由proxy做sharding后转发到后端redis实例。这个sharding的规则(常称之为路由表、转发表、slot表等)保存在集中化的组件(比如zookeeper、文件系统等)上,然后由Dashboard统一配置到所有Proxy上。相比而言,redis自己的集群方案redis-cluster则是无中心化的架构,如图2所示,它没有集中化的控制组件和proxy,客户端可以向集群内的任意一台节点发送请求,然后根据节点的返回值做重定向(MOVE或ASK)操作,客户端本地也会缓存slot表,并根据每次的重定向信息来更新这个表。由于没有中心化组件存储或配置路由表,因此redis-cluster使用gossip在集群间同步路由表和集群拓补信息,在经过一段时间时候,理想情况下集群中每个节点都掌握了整个集群的路由信息。
图1 Codis架构图
图2 redis-cluster
对nosql数据库而言,水平扩缩容(Scale in/out)是一项基本的能力。Scale in/out是指可以动态的添加或删除集群中的节点,来水平扩展或收缩集群容量和CPU算力,它和纵向扩缩容(Scale up/down)是相对的。由于nosql是没有schema的,一般都是简单的kv结构(或者是kkv结构),因此做Scale in/out还是相对而言比较容易的。因为key是按照slot为单位进行sharding的(常见公式有:crc16(key) % slot_num,如图3 ),因此只要将一个实例上的某些slots迁移到其它节点上,再把路由表(即slot和node的映射关系)更新即可。虽然Codis和redis-cluster都支持这种slot迁移的Scale in/out,但是他们的实现方式还是有诸多区别的,接下来本文会阐述它们的不同。
图3 key-slot-node映射关系
Slot迁移难点
将一个redis上指定slot下的所有key迁移到其他redis上并不麻烦。其实只要两步,第一步先获取这个slot下所有key,然后对每个key发送迁移命令即可。由于redis本身没有slot的概念,更不维护key与slot的映射关系,因此第一步是需要改造redis引擎,使其可以维护key与slot的映射关系,这一点redis-cluster和Codis都是这么做的(比如使用一个单独的dict数组来维护这种索引关系,每个数组的下标就是slot num,每个数组元素是一个dick,里面存放的是<key、crc> pair)。第二步发送就比较简单了,redis原生支持对一些key进行迁移的命令:MIGRATE,如下:
MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1 key2 ... keyN
redis-cluster的确就是直接使用MIGRATE 命令进行key的迁移,但是这个命令是同步阻塞的,鉴于redis单线程的特性,当MIGRATE耗时太久(比如网络较慢、迁移bigkey)时会导致主线程无法处理用户请求,从而导致用户RT变大甚至超时。因此,直接使用MIGRATE命令虽然方便,但是有诸多限制。Codis自己修改了redis引擎,加入了slots同步迁移和异步迁移的功能(同步迁移比较简单,本文不再赘述)。
因此,要想做到平滑的、用户基本无感的scale in/out,slot迁移需要解决以下几个难点:
- 不能使用同步阻塞的迁移方式,否则对于bigkey或者慢网络迁移会阻塞主线程正常服务
- 对bigkey的迁移需要特殊处理,否则在bigkey的序列化、发送、反序列化时都可能导致源redis实例和目标redis实例主线程阻塞
- 单个key的迁移过程需要保证原子性,即要么一个key全部迁移成功,要么全部迁移失败,不存在中间状态
- 对迁移中的slot的读写处理,即一个slot如果正处于迁移过程中,其中的key(已迁移走、迁移中或等待迁移)是否可以被正常读写
Redis-Cluster实现
图4 redis-cluster slot迁移
slot分配
如图4所述,redis-cluster为了支持slot迁移,改造引擎加入了key和slot的映射关系。redis-cluster使用rax树来维护这个关系,因此在新建集群、集群扩缩容的时候,都会涉及到slot分配、删除等操作,这些操作主要通过以下命令实现:
- cluster addslots <slot> [slot ...] 将一个或多个槽(slot)指派(assign)给当前节点。
- cluster delslots <slot> [slot ...] 移除一个或多个槽对当前节点的指派。
- cluster flushslots 移除指派给当前节点的所有槽,让当前节点变成一个没有指派任何槽的节点。
- cluster setslot <slot> node <node_id> 将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽>,然后再进行指派。
key-slot操作
一旦映射关系建立好,接下来就可以执行key相关的slot命令,redis-cluster提供了以下几个命令:
- cluster slot <key> 计算键 key 应该被放置在哪个槽上。
- cluster countkeysinslot <slot> 返回槽 slot 目前包含的键值对数量。
- cluster getkeysinslot <slot> <count> 返回 count 个 slot 槽中的键。
slot迁移流程
redis-cluster在迁移一个slot的时候具体流程如下:
- 对目标节点发送 cluster setslot <slot> importing <sourceNodeId> 命令,让目标节点准备导入槽的数据。
- 对源节点发送 cluster setslot <slot> migrating <targetNodeId> 命令,让源节点准备迁出槽的数据。
- 源节点循环执行 cluster getkeysinslot <slot> <count> 命令,获取count个属于槽slot的键。
- 在源节点上执行 migrate <targetIp> <targetPort> "" 0 <timeout> keys <keys...> 命令,把获取的键通过流水线(pipeline)机制批量迁移到目标节点。
- 重复执行步骤3和步骤4直到槽下所有的键值数据迁移到目标节点。
- 向集群内所有主节点发送cluster setslot <slot> node <targetNodeId>命令,通知槽分配给目标节点。为了保证槽节点映射变更及时传播,需要遍历发送给所有主节点更新被迁移的槽指向新节点。
如果中途想取消一个迁移,可以向节点发送 cluster setslot <slot> stable 取消对槽 slot 的导入(import)或者迁移(migrate)状态。
key迁移的原子性:
由于migrate命令是同步阻塞的(同步发送并同步接收),迁移过程会阻塞该引擎上的所有key的读写,只有在迁移响应成功之后才会将本地key删除,因此迁移是原子的。
迁移中的读写冲突:
因为MIGRATE命令是同步阻塞的,因此不会存在一个key正在被迁移又同时被读写的情况,但是由于一个slot下可能有部分key被迁移完成,部分key正在等待迁移的情况,为此如果读写的一个key所属的slot正在被迁移,redis-cluster做如下处理:
- 客户端根据本地slots缓存发送命令到源节点,如果存在键对象则直接执行并返回结果给客户端。
- 如果键对象不存在,但是key所在的slot属于本节点,则可能存在于目标节点,这时源节点会回复ASK重定向异常。格式如下:(error)ASK <slot> <targetIP>:<targetPort>。
- 客户端从ASK重定向异常提取出目标节点信息,发送asking命令到目标节点打开客户端连接标识,再执行键命令。如果存在则执行,不存在则返回不存在信息
- 如果key所在的slot不属于本节点,则返回MOVE重定向。格式如下:(error)MOVED <slot> <targetIP>:<targetPort>。
总结
redis-cluster让redis集群化,Scale能力拓展了分布式的灵活性。但是也给redis带来了一些限制,其实这些限制也是其他redis集群方案基本都有的。比如,由于redis追求简单、高性能,并不支持跨节点(分布式)事务,因此一些涉及到可能跨节点的操作都将被限制,主要有:
- key批量操作支持有限。如mset、mget,目前只支持具有相同slot值的key执行批量操作。对于映射为不同slot值的key由于执行mget、mget等操作可能存在于多个节点上因此不被支持。
- key事务操作支持有限。同理只支持多key在同一节点上的事务操作,当多个key分布在不同的节点上时无法使用事务功能。
- key作为数据分区的最小粒度,因此不能将一个大的键值对象如hash、list等映射到不同的节点。
- 不支持多数据库空间。单机下的Redis可以支持16个数据库,集群模式下只能使用一个数据库空间,即db 0。
- 复制结构只支持一层,从节点只能复制主节点,不支持嵌套树状复制结构。
- migrate是同步阻塞迁移,对于bigkey迁移会导致引擎阻塞,从而影响对该引擎的所有key的读写。
Codis实现
slot分配
和redis-cluster不同,codis的redis上不会维护slot表信息,每个redis都默认自己负责1024个slot,slot表是维护在Dashboard并被Proxy感知的,这一点算是Codis的架构一个较大的特点。
key相关命令:
Codis只提供了一个key相关的slot命令:slotshashkey [key1 key2...] , 获取key所对应的hashslot。
迁移过程:
- Dashboard制定迁移计划,并主动发起迁移
- 期间Proxy也可以发起被动迁移(对于同步迁移而言)
- Dashboard向目标redis循环发送 slotsmgrttagslot-async $host $port $timeout $maxbulks $maxbytes $slot $numkeys 命令开启异步迁移(每次随机迁移最多numkeys个key,不指定默认为100)
- 直到被迁移的slot内所有的key已经被迁移成功,则迁移结束
具体流程可见图5。
图5 codis slot迁移流程
key迁移的原子性:
由于codis使用异步迁移slotsmgrttagslot-async命令,因此无法像redis-cluster那样利用MIGRATE命令同步阻塞的特性保证key迁移的原子性。为此,Codis做了以下手段来保证key的原子性:
- 迁移中的key为只读状态,对于写命令则返回TRAGIN错误由Proxy进行重试
- 对于bigkey进行拆分迁移,每个拆分指令会在目标redis上设置临时TTL(迁移完成再修正),如果中途迁移失败,那么最终目标redis上的key会过期被删除
- 只有key整个迁移成功正确收到响应,才会将本地key删除
迁移中的读写冲突:
和redis-cluster同步迁移不同,Codis由于使用异步迁移,因此一个正处于迁移状态的key(即key已经被发送或者被部分发送,还没有得到最终响应)是可能被用户继续读写的,为此除了像redis-cluster那样要考虑迁移中的slot,Codis还需要考虑迁移中的key的读写冲突处理。
对于一个读写请求,如果key所在的slot正在被迁移 ,proxy会使用slotsmgrt-exec-wrapper $hashkey $command [$arg1 ...] 命令对原始请求进行包装一下再发送给redis,如果原始命令是读操作则可以正常响应,如果是写操作则redis返回TRYAGIN错误,由Proxy进行重试。如果key已经迁移走,则引擎返回MOVED错误,Proxy需要更新路由表,具体过程如图6所示。
图6 codis对迁移中的key的读写处理
同步迁移与异步迁移
本文将详细描述同步迁移和异步迁移的实现原理。
同步迁移
图7所示的就是同步迁移的流程,源端会将key进行序列化,然后使用socket将数据发送到目标redis(其实就是调用restore命令),目标redis收到restore命令后会对key进行反序列化,存储到DB之后回复ACK,源端redis收到ACK之后就将本地的key删除。可以看到,整个过程,源端redis都是阻塞的,如果迁移的key是一个bigkey,会导致源端序列化、网络传输、目标端反序列化、源端同步删除非常耗时,由于redis的单线程特性,时间循环(eventloop)无法及时处理用户的读写事件,从而导致用户RT增高甚至超时。
图7 同步迁移流程
由于redis支持list、set、zset、hash等复合数据结构,因此会有bigkey的问题。图8所示的就是MIGRATE命令实现原理,在MIGRATE中,所谓的序列化其实就是将key对应的value进行RDB格式化,在目标端redis按照RDB格式进行加载。如果list、set、zset、hash成员很多(比如几千个甚至几万个),那么RDB格式化和加载就会非常耗时。
图8 MIGRATE命令原理
异步迁移
既然同步迁移会阻塞主线程,那么很容易想到的解决方案就是使用一个独立线程做迁移,如图9所示。由于多线程会设计到对共享数据(比如DB)的访问,因此需要加同步原语,这对redis单线程、几乎无锁的架构而言,改动起来是比较复杂的。
图9 独立线程实现异步迁移
另一种异步迁移实现思路,是依然采用单线程模型,即对象的序列化(在源redis端)和反序列化(在目标redis端)依然会阻塞主线程,但是和MIGRATE同步迁移不同,异步迁移不会同步等待restore的返回,restore完成之后目标端redis会向源端redis发送一个restore-ack命令(类似于回调机制)来通知源端redis迁移的状态。因此这样大大的减少了源端redis迁移的阻塞时间,可以让事件循环(eventloop)尽快的处理下一个就绪事件。
由于这种方案依然依赖于主线程做序列化和反序列化,因此,为了进一步降低序列化和反序列化的耗时,Codis使用拆分指令(chunked)的方式对bigkey做迁移处理。如图10所示,对于一个list而言,假设其包含非常多的elem,如果一次性将其全部序列化则非常耗时,如果将其等价拆分成一条条RPUSH指令,则每一条指令则非常的轻量。
图10 指令拆分
使用指令拆分之后,原本一个key只需要一条restore命令的迁移,现在变成很多条,因此为了保证迁移的原子性(即不会存在一些elem迁移成功,一些elem迁移失败),Codis会在每一个拆分指令中加上一个临时TTL,由于只有全部前已成功才会删除本地的key,因此即使中途迁移失败,已迁移成功的elem也会超时自动删除,最终效果就好比迁移没有发生一样。elem全部迁移成功之后,Codis会再单独发送一个修正TTL的命令并删除本地的key。
图11 临时TTL
异步迁移的第一步,就是先发一条DEL命令删除目标redis上的key,如图12所示。
图12 第一步先删除目标key
如图13所示,接下来收到目标redis的ACK之后会继续发送后续的拆分指令,每次发送的拆分指令的个数是可以参数控制的。
图13 临时TTL
所有的拆分指令全部发送完成之后,会再发一个修成TTL的指令,最后删除本地的key。
图14 迁移完成删除本地的key
并不是所有的key都会采用chunked的方式迁移,对于string对象、小对象依然可以直接使用RDB格式序列化,只有对于大对象(bigkey)才会触发chunked方式迁移。
图15 针对不同对象使用不同迁移方式
异步迁移源码解读
前文主要论述了redis-cluster同步迁移和Codis异步迁移的异同和原理,redis-cluster同步迁移可以参考redis源码中cluster.c中关于migrateCommand和restoreCommand实现,源码还是非常简单的。Codis的slot迁移提供了同步和异步两种,同步迁移的代码在slots.c中,其代码和redis原生的migrateCommand基本一致,因此两者观其一即可。异步迁移代码在slots_async.c中,这块的原创性就比较高了,由于原作者对代码基本没有加注释,因此为了便于理解,我在阅读源码的时候简单的加了一些中文注释,就贴在这里吧。原理如前文所述,想看实现的可以看下面的代码,我就不一一拆分解释了,因为太多了。。。
#include "server.h" /* ============================ Worker Thread for Lazy Release ============================= */ typedef struct { pthread_t thread;/* lazy工作线程 */ pthread_mutex_t mutex;/* 互斥信号量 */ pthread_cond_t cond;/* 条件变量 */ list *objs; /* 要被lazy释放的对象链表 */ } lazyReleaseWorker; /* lazy释放主线程 */ static void * lazyReleaseWorkerMain(void *args) { lazyReleaseWorker *p = args; while (1) { /* 等待在条件变量上,条件为待释放对象链表长度为0 */ pthread_mutex_lock(&p->mutex); while (listLength(p->objs) == 0) { pthread_cond_wait(&p->cond, &p->mutex); } /* 取出链表的第一个节点 */ listNode *head = listFirst(p->objs); /* 节点值为要释放的对象 */ robj *o = listNodeValue(head); /* 从链表中删除这个节点 */ listDelNode(p->objs, head); pthread_mutex_unlock(&p->mutex); /* 释放对象 */ decrRefCount(o); } return NULL; } /* lazy释放一个对象 */ static void lazyReleaseObject(robj *o) { /* 对象当前的refcount必须已经为1,即已经没有任何人引用这个对象 */ serverAssert(o->refcount == 1); /* 获取lazyReleaseWorker */ lazyReleaseWorker *p = server.slotsmgrt_lazy_release; /* 上锁 */ pthread_mutex_lock(&p->mutex); if (listLength(p->objs) == 0) { /* 如果待释放队列长度为0,则唤醒释放线程 */ pthread_cond_broadcast(&p->cond); } /* 将待释放对象加入释放链表 */ listAddNodeTail(p->objs, o); /* 解锁 */ pthread_mutex_unlock(&p->mutex); } /* 创建lazy释放工作线程 */ static lazyReleaseWorker * createLazyReleaseWorkerThread() { lazyReleaseWorker *p = zmalloc(sizeof(lazyReleaseWorker)); pthread_mutex_init(&p->mutex, NULL); pthread_cond_init(&p->cond, NULL); p->objs = listCreate(); /* 创建线程 */ if (pthread_create(&p->thread, NULL, lazyReleaseWorkerMain, p) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Worker Thread for Lazy Release Jobs."); exit(1); } return p; } /* 初始化Lazy释放工作线程 */ void slotsmgrtInitLazyReleaseWorkerThread() { server.slotsmgrt_lazy_release = createLazyReleaseWorkerThread(); } /* ============================ Iterator for Data Migration ================================ */ #define STAGE_PREPARE 0 #define STAGE_PAYLOAD 1 #define STAGE_CHUNKED 2 #define STAGE_FILLTTL 3 #define STAGE_DONE 4 /* 单对象迭代器 */ typedef struct { int stage; robj *key;/* 单对象对应的的key */ robj *val;/* 单对象对应的的值 */ long long expire;/* 该对象对应的过期设置 */ unsigned long cursor;/* 游标,用于dictScan */ unsigned long lindex;/* 索引,listTypeInitIterator时用到 */ unsigned long zindex;/* 索引,遍历zset时用到 */ unsigned long chunked_msgs;/* 该对象chunked消息个数 */ } singleObjectIterator; /* 创建单对象迭代 */ static singleObjectIterator * createSingleObjectIterator(robj *key) { /* 分配空间 */ singleObjectIterator *it = zmalloc(sizeof(singleObjectIterator)); /* 初始化阶段 */ it->stage = STAGE_PREPARE; /* 设置key */ it->key = key; /* 引用计数 */ incrRefCount(it->key); it->val = NULL; it->expire = 0; it->cursor = 0; it->lindex = 0; it->zindex = 0; it->chunked_msgs = 0; return it; } /* 释放SingleObjectIterator */ static void freeSingleObjectIterator(singleObjectIterator *it) { if (it->val != NULL) { /* 对val解引用 */ decrRefCount(it->val); } /* 对key解引用 */ decrRefCount(it->key); /* 释放结构 */ zfree(it); } static void freeSingleObjectIteratorVoid(void *it) { freeSingleObjectIterator(it); } /* 判断单个对象是否还有下一个阶段需要处理 */ static int singleObjectIteratorHasNext(singleObjectIterator *it) { /* 只要状态不是STAGE_DONE就还需要继续处理 */ return it->stage != STAGE_DONE; } /* 如果是sds编码的字符串对象就返回sds底层字符换的长度,否则返回默认长度len */ static size_t sdslenOrElse(robj *o, size_t len) { return sdsEncodedObject(o) ? sdslen(o->ptr) : len; } /* 如果val类型为dict时执行dictScan操作的回调 */ static void singleObjectIteratorScanCallback(void *data, const dictEntry *de) { /* 提取privdata {ll, val, &len}*/ void **pd = (void **)data; list *l = pd[0];/* 链表,用于存放scan出来的元素 */ robj *o = pd[1];/* 被迭代的对象值val */ long long *n = pd[2];/* 返回字节数的指针 */ robj *objs[2] = {NULL, NULL}; switch (o->type) { case OBJ_HASH: /* 如果原对象是hash,则分别将hash的key和value按顺序方式链表 */ objs[0] = dictGetKey(de); objs[1] = dictGetVal(de); break; case OBJ_SET: /* 如果原对象是set,则只将hash的key放入链表 */ objs[0] = dictGetKey(de); break; } /* 将扫出来的对象添加到链表 */ for (int i = 0; i < 2; i ++) { if (objs[i] != NULL) { /* 引用计数 */ incrRefCount(objs[i]); /* 这个对象的大小,对于string对象就是string长度,其他对象就按8字节算 */ *n += sdslenOrElse(objs[i], 8); listAddNodeTail(l, objs[i]); } } } /* 将double转为内存二进制表示 */ static uint64_t convertDoubleToRawBits(double value) { union { double d; uint64_t u; } fp; fp.d = value; return fp.u; } /* 将内存二进制表示转为double值 */ static double convertRawBitsToDouble(uint64_t value) { union { double d; uint64_t u; } fp; fp.u = value; return fp.d; } /* 从Uint64创建RawString对象 */ static robj * createRawStringObjectFromUint64(uint64_t v) { uint64_t p = intrev64ifbe(v); return createRawStringObject((char *)&p, sizeof(p)); } /* 从RawString获取Uint64 */ static int getUint64FromRawStringObject(robj *o, uint64_t *p) { if (sdsEncodedObject(o) && sdslen(o->ptr) == sizeof(uint64_t)) { *p = intrev64ifbe(*(uint64_t *)(o->ptr)); return C_OK; } return C_ERR; } /* 计算一个对象需要的restore命令的个数,单个restore上只能携带maxbulks个Bulk Bulk:$6\r\nfoobar\r\n Multi-bulk :"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n" */ static long numberOfRestoreCommandsFromObject(robj *val, long long maxbulks) { long long numbulks = 0; switch (val->type) { case OBJ_LIST: if (val->encoding == OBJ_ENCODING_QUICKLIST) { /* list的长度就是需要的Bulk的数目 */ numbulks = listTypeLength(val); } break; case OBJ_HASH: if (val->encoding == OBJ_ENCODING_HT) { /* hash表中每个元素需要2个Bulk */ numbulks = hashTypeLength(val) * 2; } break; case OBJ_SET: if (val->encoding == OBJ_ENCODING_HT) { /* set中每个元素需要1个Bulk */ numbulks = setTypeSize(val); } break; case OBJ_ZSET: if (val->encoding == OBJ_ENCODING_SKIPLIST) { /* zset中每个元素需要2个Bulk */ numbulks = zsetLength(val) * 2; } break; } /* 如果实际的numbulks比要求的maxbulks小,则使用一条restore命令 */ if (numbulks <= maxbulks) { return 1; } /* 计算需要的restore命令个数 */ return (numbulks + maxbulks - 1) / maxbulks; } /* 估计Restore命令的个数 */ static long estimateNumberOfRestoreCommands(redisDb *db, robj *key, long long maxbulks) { /* 查找key对应的val */ robj *val = lookupKeyWrite(db, key); if (val != NULL) { return numberOfRestoreCommandsFromObject(val, maxbulks); } return 0; } extern void createDumpPayload(rio *payload, robj *o); extern zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank); static slotsmgrtAsyncClient *getSlotsmgrtAsyncClient(int db); /* 单对象迭代,返回值为命令个数(Bulks) */ static int singleObjectIteratorNext(client *c, singleObjectIterator *it, long long timeout, unsigned int maxbulks, unsigned int maxbytes) { /* * * STAGE_PREPARE ---> STAGE_PAYLOAD ---> STAGE_DONE * | A * V | * +------------> STAGE_CHUNKED ---> STAGE_FILLTTL * A | * | V * +-------+ * */ /* 本次迭代的key */ robj *key = it->key; /* 但对象迁移的准备阶段 */ if (it->stage == STAGE_PREPARE) { /* 以写的方式查找key,与lookupKeyRead区别是没有命中率更新 */ robj *val = lookupKeyWrite(c->db, key); if (val == NULL) { /* 如果key没有找到,则结束 */ it->stage = STAGE_DONE; return 0; } /* 设置值 */ it->val = val; /* 增加引用 */ incrRefCount(it->val); /* 设置过期时间 */ it->expire = getExpire(c->db, key); /* 前导消息 */ int leading_msgs = 0; /* 获取db对应的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == c) { /* 只有slotsmgrtAsyncClient未被使用的时候 */ if (ac->used == 0) { /* 表示已经被使用 */ ac->used = 1; /* 如果需要验证 */ if (server.requirepass != NULL) { /* SLOTSRESTORE-ASYNC-AUTH $password */ addReplyMultiBulkLen(c, 2); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-AUTH"); addReplyBulkCString(c, server.requirepass); leading_msgs += 1; } /* SELECT DB操作 */ do { /* SLOTSRESTORE-ASYNC-SELECT $db */ addReplyMultiBulkLen(c, 2); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-SELECT"); addReplyBulkLongLong(c, c->db->id); leading_msgs += 1; } while (0); } } /* SLOTSRESTORE-ASYNC delete $key */ addReplyMultiBulkLen(c, 3); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "delete"); addReplyBulk(c, key); /* 计算需要的restore命令个数,maxbulks表示一个restore命令可承载的bulk最大数目 */ long n = numberOfRestoreCommandsFromObject(val, maxbulks); if (n >= 2) { /* 如果需要2个及以上,则进入CHUNKED阶段,即启用分块传输 */ it->stage = STAGE_CHUNKED; /* chunked消息个数 */ it->chunked_msgs = n; } else { /* 否则一个restore可以承载,则直接进入PAYLOAD阶段 */ it->stage = STAGE_PAYLOAD; it->chunked_msgs = 0; } /* 这里的1为delete命令,再加上其他的前导命令(如果有),作为命令个数返回 */ return 1 + leading_msgs; } /* 取出key对应的值 */ robj *val = it->val; long long ttl = 0; if (it->stage == STAGE_CHUNKED) { /* 如果是CHUNKED阶段,则设置一个临时ttl */ ttl = timeout * 3; } else if (it->expire != -1) { /* 否则如果val上有过期时间,则重新计算ttl */ ttl = it->expire - mstime(); if (ttl < 1) { ttl = 1; } } /* 当一个CHUNKED对象全部序列化完成之后会到这个阶段 */ if (it->stage == STAGE_FILLTTL) { /* SLOTSRESTORE-ASYNC expire $key $ttl */ addReplyMultiBulkLen(c, 4); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "expire"); addReplyBulk(c, key); /* 设置真实的ttl */ addReplyBulkLongLong(c, ttl); /* 迭代结束 */ it->stage = STAGE_DONE; /* 该阶段只有一个命令 */ return 1; } /* 如果是PAYLOAD阶段切val类型不是OBJ_STRING */ if (it->stage == STAGE_PAYLOAD && val->type != OBJ_STRING) { /* 负载缓冲区 */ rio payload; /* 将val序列化为RDB格式 */ createDumpPayload(&payload, val); /* SLOTSRESTORE-ASYNC object $key $ttl $payload */ addReplyMultiBulkLen(c, 5); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); /* 对象类型 */ addReplyBulkCString(c, "object"); addReplyBulk(c, key); addReplyBulkLongLong(c, ttl); /* 添加payload */ addReplyBulkSds(c, payload.io.buffer.ptr); /* 迭代结束 */ it->stage = STAGE_DONE; /* 该阶段只有一个命令 */ return 1; } /* 如果是PAYLOAD阶段切val类型为OBJ_STRING */ if (it->stage == STAGE_PAYLOAD && val->type == OBJ_STRING) { /* SLOTSRESTORE-ASYNC string $key $ttl $payload */ addReplyMultiBulkLen(c, 5); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "string"); addReplyBulk(c, key); addReplyBulkLongLong(c, ttl); addReplyBulk(c, val); /* 迭代结束 */ it->stage = STAGE_DONE; /* 该阶段只有一个命令 */ return 1; } /* 如果是CHUNKED类型 */ if (it->stage == STAGE_CHUNKED) { const char *cmd = NULL; /* 根据val的类型使用不同的子命令 */ switch (val->type) { case OBJ_LIST: cmd = "list"; break; case OBJ_HASH: cmd = "hash"; break; case OBJ_SET: cmd = "dict"; break; case OBJ_ZSET: cmd = "zset"; break; default: serverPanic("unknown object type"); } /* 是否还有更多需要序列化 */ int more = 1; /* ll链表用于存放本次SLOTSRESTORE-ASYNC命令携带的args */ list *ll = listCreate(); /* 设置是否函数,本质就是调用decrRefCount */ listSetFreeMethod(ll, decrRefCountVoid); long long hint = 0, len = 0; if (val->type == OBJ_LIST) { /* 如果val类型为OBJ_LIST,则创建list迭代 */ listTypeIterator *li = listTypeInitIterator(val, it->lindex, LIST_TAIL); do { /* 表示list每一项 */ listTypeEntry entry; /* 遍历 */ if (listTypeNext(li, &entry)) { quicklistEntry *e = &(entry.entry); robj *obj; if (e->value) { /* */ obj = createStringObject((const char *)e->value, e->sz); } else { /* */ obj = createStringObjectFromLongLong(e->longval); } /* 累计字节数 */ len += sdslenOrElse(obj, 8); /* 添加到ll */ listAddNodeTail(ll, obj); /* 索引加1 */ it->lindex ++; } else { /* 没有更多了 */ more = 0; } /* 当还有更多要发送且ll现有元素个数小于maxbulks且字节数小于 maxbytes */ } while (more && listLength(ll) < maxbulks && len < maxbytes); /* 释放迭代器 */ listTypeReleaseIterator(li); /* 原list的总长度 */ hint = listTypeLength(val); } if (val->type == OBJ_HASH || val->type == OBJ_SET) { /* 控制循环次数 */ int loop = maxbulks * 10; /* 默认最大循环次数 */ if (loop < 100) { loop = 100; } dict *ht = val->ptr; void *pd[] = {ll, val, &len}; do { it->cursor = dictScan(ht, it->cursor, singleObjectIteratorScanCallback, pd); if (it->cursor == 0) { /* 没有更多了 */ more = 0; } /* 如果还有更多且ll现有元素个数小于maxbulks且本次发送字节数小于maxbytes且loop不为0 */ } while (more && listLength(ll) < maxbulks && len < maxbytes && (-- loop) >= 0); /* 原hash的总大小 */ hint = dictSize(ht); } if (val->type == OBJ_ZSET) { /* 如果是ZSET类型 */ zset *zs = val->ptr; dict *ht = zs->dict; long long rank = (long long)zsetLength(val) - it->zindex; zskiplistNode *node = (rank >= 1) ? zslGetElementByRank(zs->zsl, rank) : NULL; do { if (node != NULL) { robj *field = node->obj; incrRefCount(field); len += sdslenOrElse(field, 8); listAddNodeTail(ll, field); uint64_t bits = convertDoubleToRawBits(node->score); robj *score = createRawStringObjectFromUint64(bits); len += sdslenOrElse(score, 8); listAddNodeTail(ll, score); node = node->backward; it->zindex ++; } else { /* 没有更多了 */ more = 0; } /* 如果还有更多元素且bulks没有超过maxbulks且产生的字节数没有超过maxbytes */ } while (more && listLength(ll) < maxbulks && len < maxbytes); /* 原hash总大小 */ hint = dictSize(ht); } /* SLOTSRESTORE-ASYNC list/hash/zset/dict $key $ttl $hint [$arg1 ...] */ addReplyMultiBulkLen(c, 5 + listLength(ll));/* MultiBulk总长度 */ addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, cmd);/* list?hash? */ addReplyBulk(c, key); addReplyBulkLongLong(c, ttl);/* ttl */ addReplyBulkLongLong(c, hint);/* 总大小 */ /* 遍历ll,ll里面存放了本地要发送的args */ while (listLength(ll) != 0) { /* 取出头结点 */ listNode *head = listFirst(ll); /* 取出值对象 */ robj *obj = listNodeValue(head); /* 添加回复 */ addReplyBulk(c, obj); /* 删除该节点 */ listDelNode(ll, head); } /* 释放ll */ listRelease(ll); if (!more) { /* 如果对象所有元素都被序列换完毕,则进入FILLTTL阶段 */ it->stage = STAGE_FILLTTL; } /* 该阶段只有一个命令 */ return 1; } if (it->stage != STAGE_DONE) { serverPanic("invalid iterator stage"); } serverPanic("use of empty iterator"); } /* ============================ Iterator for Data Migration (batched) ====================== */ typedef struct { struct zskiplist *tags;/* 标识一个hashtag有没有被添加过 */ dict *keys;/* 批处理的Keys */ list *list; /* 每个节点的值都是singleObjectIterator */ dict *hash_slot;/* hash数组,数组的下标为slot_num,每个数组元素的字典为key、crc对 */ struct zskiplist *hash_tags;/* 用于保存具有hashtag的key,score为key的crc,值为key */ long long timeout;/* 进程chunked restore时会指定临时ttl,值为timeout*3 */ unsigned int maxbulks;/* 单次restore最多发送多少个bulks */ unsigned int maxbytes;/* 单次发送最多发送多少字节数 */ list *removed_keys;/* 一个key被发送完成之后会加入这个链表 */ list *chunked_vals;/* 用于存放使用chunked方式发生的val */ long estimate_msgs;/* 估算的restore命令的个数 */ } batchedObjectIterator; /* 创建batchedObjectIterator */ static batchedObjectIterator * createBatchedObjectIterator(dict *hash_slot, struct zskiplist *hash_tags, long long timeout, unsigned int maxbulks, unsigned int maxbytes) { batchedObjectIterator *it = zmalloc(sizeof(batchedObjectIterator)); it->tags = zslCreate(); it->keys = dictCreate(&setDictType, NULL); it->list = listCreate(); listSetFreeMethod(it->list, freeSingleObjectIteratorVoid); it->hash_slot = hash_slot; it->hash_tags = hash_tags; it->timeout = timeout; it->maxbulks = maxbulks; it->maxbytes = maxbytes; it->removed_keys = listCreate(); listSetFreeMethod(it->removed_keys, decrRefCountVoid); it->chunked_vals = listCreate(); listSetFreeMethod(it->chunked_vals, decrRefCountVoid); it->estimate_msgs = 0; return it; } /* 释放BatchedObjectIterator */ static void freeBatchedObjectIterator(batchedObjectIterator *it) { zslFree(it->tags); dictRelease(it->keys); listRelease(it->list); listRelease(it->removed_keys); listRelease(it->chunked_vals); zfree(it); } /* 批处理迭代(即一次处理多个key) */ static int batchedObjectIteratorHasNext(batchedObjectIterator *it) { /* list链表不为空,每个节点的值都是singleObjectIterator */ while (listLength(it->list) != 0) { /* 每个节点的值都是singleObjectIterator */ listNode *head = listFirst(it->list); /* 每个节点的值都是singleObjectIterator */ singleObjectIterator *sp = listNodeValue(head); /* 判断单个对象是否已经处于STAGE_DONE */ if (singleObjectIteratorHasNext(sp)) { /* 不处于STAGE_DONE,即单对象迭代还没结束,则直接返回1,下次还会迭代这个对象 */ return 1; } /* 否则当前单对象已经迭代结束 */ if (sp->val != NULL) { /* 如果当前单对象的value不为空,就把单对象的key添加到removed_keys链表 */ incrRefCount(sp->key); listAddNodeTail(it->removed_keys, sp->key); if (sp->chunked_msgs != 0) { /* 如果chunked的消息个数不为0 */ incrRefCount(sp->val); /* 就把val加入到chunked_vals链表 */ listAddNodeTail(it->chunked_vals, sp->val); } } /* 删除这个节点 */ listDelNode(it->list, head); } return 0; } /* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */ static int batchedObjectIteratorNext(client *c, batchedObjectIterator *it) { /* 遍历链表 */ if (listLength(it->list) != 0) { /* 取出头结点 */ listNode *head = listFirst(it->list); /* 节点值为singleObjectIterator */ singleObjectIterator *sp = listNodeValue(head); /* maxbytes减去客户端输出缓冲区当前已有的大小就是本次能发送的最大字节数 */ long long maxbytes = (long long)it->maxbytes - getClientOutputBufferMemoryUsage(c); /* 单对象迭代,迭代超时timeout,迭代单词最大maxbulks,单次最大maxbytes */ return singleObjectIteratorNext(c, sp, it->timeout, it->maxbulks, maxbytes > 0 ? maxbytes : 0); } serverPanic("use of empty iterator"); } /* 批处理里面是否包含key,返回1表示存在,返回0表示不存在 */ static int batchedObjectIteratorContains(batchedObjectIterator *it, robj *key, int usetag) { /* 如果在keys中找到,则存在 */ if (dictFind(it->keys, key) != NULL) { return 1; } /* 如果没有使用hashtag则结束查找 */ if (!usetag) { return 0; } uint32_t crc; int hastag; /* 计算key的crc和hashtag */ slots_num(key->ptr, &crc, &hastag); if (!hastag) { /* 如果key没有hashtag则结束查找 */ return 0; } /* 否则填充range */ zrangespec range; range.min = (double)crc; range.minex = 0; range.max = (double)crc; range.maxex = 0; /* 以crc为范围在跳表tags中查找,每一个hashtag被添加都会在tags跳表中添加一个节点 */ return zslFirstInRange(it->tags, &range) != NULL; } /* 向批处理添加一个key,返回值为本次新添加的key的个数 */ static int batchedObjectIteratorAddKey(redisDb *db, batchedObjectIterator *it, robj *key) { /* 添加到keys字典 */ if (dictAdd(it->keys, key, NULL) != C_OK) { return 0; } /* 引用计数 */ incrRefCount(key); /* 创建createSingleObjectIterator */ listAddNodeTail(it->list, createSingleObjectIterator(key)); /* 对该对象需要的restore命令个数进行预估 */ it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks); /* 当前批处理的key个数 */ int size = dictSize(it->keys); uint32_t crc; int hastag; /* 该key对应的slot num */ slots_num(key->ptr, &crc, &hastag); if (!hastag) { /* 如果key不含有hashtag则跳出 */ goto out; } /* 知道score为crc */ zrangespec range; range.min = (double)crc; range.minex = 0; range.max = (double)crc; range.maxex = 0; /* 寻找第一个score满足 range范围的节点*/ if (zslFirstInRange(it->tags, &range) != NULL) { /* 找到则跳出,因此是该hashtag的key已经被添加过,无需重复添加 */ goto out; } /* 引用计数 */ incrRefCount(key); /* 没找到则插入,score为crc,节点的值为key */ zslInsert(it->tags, (double)crc, key); /* 如果hash_tags跳表指针为NULL */ if (it->hash_tags == NULL) { goto out; } /* 在hash_tags中寻找score满足range范围的第一个节点 */ zskiplistNode *node = zslFirstInRange(it->hash_tags, &range); /* 如果score不同就跳出 */ while (node != NULL && node->score == (double)crc) { /* 结点值就是key */ robj *key = node->obj; /* score相同的节点都是连续排列的,因此直接从level[0]向后遍历就好 */ node = node->level[0].forward; /* 添加到批处理keys */ if (dictAdd(it->keys, key, NULL) != C_OK) { continue; } /* 引用计数 */ incrRefCount(key); /* 为该key添加但对象迭代器SingleObjectIterator */ listAddNodeTail(it->list, createSingleObjectIterator(key)); /* 对该对象需要的restore命令个数进行预估 */ it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks); } out: /* 本次新加如的key的个数,注意最开始的1个key也要加上 */ return 1 + dictSize(it->keys) - size; } /* ============================ Clients ==================================================== */ /* 获取异步迁移客户端,每个db一个 */ static slotsmgrtAsyncClient * getSlotsmgrtAsyncClient(int db) { return &server.slotsmgrt_cached_clients[db]; } /* 通知被阻塞的 SlotsmgrtAsyncClient */ static void notifySlotsmgrtAsyncClient(slotsmgrtAsyncClient *ac, const char *errmsg) { /* 获取当前迭代器 */ batchedObjectIterator *it = ac->batched_iter; /* 获取阻塞链表 */ list *ll = ac->blocked_list; /* 遍历 */ while (listLength(ll) != 0) { /* 取出头节点 */ listNode *head = listFirst(ll); /* 取出节点值,就是client */ client *c = listNodeValue(head); if (errmsg != NULL) { /* 错误信息不为空,则将错误信息返回给client */ addReplyError(c, errmsg); } else if (it == NULL) { /* 迭代器非法 */ addReplyError(c, "invalid iterator (NULL)"); } else if (it->hash_slot == NULL) { addReplyLongLong(c, listLength(it->removed_keys)); } else { /* 返回两个值,一个是本次moved一个是hash_slot现在的大小 */ addReplyMultiBulkLen(c, 2); addReplyLongLong(c, listLength(it->removed_keys)); addReplyLongLong(c, dictSize(it->hash_slot)); } /* 清除CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志,表示这个客户端不是一个正在被使用、正常服务的客户端 */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 清空客户端阻塞链表 */ c->slotsmgrt_fenceq = NULL; /* 删除当前节点 */ listDelNode(ll, head); } } /* 释放slotsmgrtAsyncClient里面的结构 */ static void unlinkSlotsmgrtAsyncCachedClient(client *c, const char *errmsg) { slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); /* 必须有CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT标志,表示这是一个已经被cached的客户端 */ serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT); serverAssert(ac->c == c); /* 通知被阻塞的客户端,消息为errmsg */ notifySlotsmgrtAsyncClient(ac, errmsg); batchedObjectIterator *it = ac->batched_iter; /* 空闲时间 */ long long elapsed = mstime() - ac->lastuse; serverLog(LL_WARNING, "slotsmgrt_async: unlink client %s:%d (DB=%d): " "sending_msgs = %ld, batched_iter = %ld, blocked_list = %ld, " "timeout = %lld(ms), elapsed = %lld(ms) (%s)", ac->host, ac->port, c->db->id, ac->sending_msgs, it != NULL ? (long)listLength(it->list) : -1, (long)listLength(ac->blocked_list), ac->timeout, elapsed, errmsg); sdsfree(ac->host); if (it != NULL) { /* 释放批处理迭代器 */ freeBatchedObjectIterator(it); } /* 释放阻塞链表 */ listRelease(ac->blocked_list); /* 取消CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示不是被缓存的slotsmgrtAsyncClient */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT; /* 情况结构,以备下一次使用(注意不需要free ac,因为这是每个db私有的) */ memset(ac, 0, sizeof(*ac)); } /* 释放一个db相关的SlotsmgrtAsyncClient */ static int releaseSlotsmgrtAsyncClient(int db, const char *errmsg) { slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); if (ac->c == NULL) { /* 为NULL无需释放 */ return 0; } client *c = ac->c; /* 释放slotsmgrtAsyncClient里面的结构 */ unlinkSlotsmgrtAsyncCachedClient(c, errmsg); /* 释放client结构 */ freeClient(c); return 1; } /* 新建一个slotsmgrtAsyncClient */ static int createSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) { /* 新建连接 */ int fd = anetTcpNonBlockConnect(server.neterr, host, port); if (fd == -1) { serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, %s", host, port, db, server.neterr); return C_ERR; } /* 禁用nagel算法 */ anetEnableTcpNoDelay(server.neterr, fd); int wait = 100; if (wait > timeout) { wait = timeout; } /* 等待可写状态 */ if ((aeWait(fd, AE_WRITABLE, wait) & AE_WRITABLE) == 0) { serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, io error or timeout (%d)", host, port, db, wait); close(fd); return C_ERR; } /* 创建redis客户端,内部会将fd读事件添加到主线程eventloop */ client *c = createClient(fd); if (c == NULL) { serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) failed, %s", host, port, db, server.neterr); return C_ERR; } /* 选择客户端绑定的db */ if (selectDb(c, db) != C_OK) { serverLog(LL_WARNING, "slotsmgrt_async: invalid DB index (DB=%d)", db); freeClient(c); return C_ERR; } /* 添加设置标志CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示这是一个已经被CACHED的客户端 */ c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT; /* 已认证 */ c->authenticated = 1; /* 释放一个db相关的SlotsmgrtAsyncClient(清空里面的成员结构) */ releaseSlotsmgrtAsyncClient(db, "interrupted: build new connection"); serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) OK", host, port, db); /* 根据db获取slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); /* 设置绑定的client */ ac->c = c; /* 没有被使用 */ ac->used = 0; /* ip */ ac->host = sdsnew(host); /* port */ ac->port = port; /* 空闲时间 */ ac->timeout = timeout; /* 更新最后一次使用时间 */ ac->lastuse = mstime(); /* 飞行中的消息计数 */ ac->sending_msgs = 0; /* 批处理迭代器 */ ac->batched_iter = NULL; /* 创建阻塞链表 */ ac->blocked_list = listCreate(); return C_OK; } /* 获取或创建一个slotsmgrtAsyncClient */ static slotsmgrtAsyncClient * getOrCreateSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) { /* 根据要操作的db获取缓存的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); if (ac->c != NULL) { /* 不为NULL,在比较下host和port,只有完全线条才返回 */ if (ac->port == port && !strcmp(ac->host, host)) { return ac; } } /* 否则新建一个slotsmgrtAsyncClient */ return createSlotsmgrtAsyncClient(db, host, port, timeout) != C_OK ? NULL : ac; } static void unlinkSlotsmgrtAsyncNormalClient(client *c) { /* 释放一个正在被使用的、正常的client */ serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT); /* 阻塞链表不能为NULL */ serverAssert(c->slotsmgrt_fenceq != NULL); /* 该客户端阻塞的链表 */ list *ll = c->slotsmgrt_fenceq; /* 在阻塞链表中搜索该客户端 */ listNode *node = listSearchKey(ll, c); /* 必须能搜索到 */ serverAssert(node != NULL); /* 不再是一个正在被使用的、正常的client */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 不再阻塞也就没有阻塞链表 */ c->slotsmgrt_fenceq = NULL; /* 从阻塞链表中删除该客户端 */ listDelNode(ll, node); } void slotsmgrtAsyncUnlinkClient(client *c) { /* 针对CACHED类型客户端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT) { unlinkSlotsmgrtAsyncCachedClient(c, "interrupted: connection closed"); } /* 针对NORMAL类型客户端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { unlinkSlotsmgrtAsyncNormalClient(c); } } /* 会被定期执行 */ void slotsmgrtAsyncCleanup() { /* 遍历所有db */ for (int i = 0; i < server.dbnum; i ++) { /* 获取每个db对应的 slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(i); if (ac->c == NULL) { continue; } /* 计算空闲时间 */ long long elapsed = mstime() - ac->lastuse; /* 提取客户端timeout */ long long timeout = ac->batched_iter != NULL ? ac->timeout : 1000LL * 60; if (elapsed <= timeout) { /* 如果空闲时间小于timeout则继续遍历 */ continue; } /* 否则就释放这个客户端 */ releaseSlotsmgrtAsyncClient(i, ac->batched_iter != NULL ? "interrupted: migration timeout" : "interrupted: idle timeout"); } } /* 获取异步迁移状态或者阻塞一个client */ static int getSlotsmgrtAsyncClientMigrationStatusOrBlock(client *c, robj *key, int block) { /* 获取当前db上的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == NULL || ac->batched_iter == NULL) { /* 没有迁移或迁移完成 */ return 0; } /* 获取当前的batched_iter */ batchedObjectIterator *it = ac->batched_iter; if (key != NULL && !batchedObjectIteratorContains(it, key, 1)) { /* 如果key不为NULL且key不在batched中则直接返回0,表示该key没有迁移或者迁移完成 */ return 0; } if (!block) { /* 如果不允许阻塞则直接返回 */ return 1; } if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { /* 如果这个客户端是一个CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT,即是一个 正在服务的slotsmgrtAsyncClient */ return -1; } /* 获取阻塞链表 */ list *ll = ac->blocked_list; /* 设置CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志,表示这是一个正常的被阻塞的客户端 */ c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 设置客户端阻塞在哪个链表上 */ c->slotsmgrt_fenceq = ll; /* 添加到阻塞队列 */ listAddNodeTail(ll, c); return 1; } /* ============================ Slotsmgrt{One,TagOne}AsyncDumpCommand ====================== */ /* SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */ /* SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */ static void slotsmgrtAsyncDumpGenericCommand(client *c, int usetag) { long long timeout; /* 获取timeout */ if (getLongLongFromObject(c->argv[1], &timeout) != C_OK || !(timeout >= 0 && timeout <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of timeout (%s)", (char *)c->argv[1]->ptr); return; } /* 如果timeout为0就修正为30s */ if (timeout == 0) { timeout = 1000 * 30; } /* 获取maxbulks */ long long maxbulks; if (getLongLongFromObject(c->argv[2], &maxbulks) != C_OK || !(maxbulks >= 0 && maxbulks <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbulks (%s)", (char *)c->argv[2]->ptr); return; } /* 如果maxbulks就修正为默认值3000 */ if (maxbulks == 0) { maxbulks = 1000; } /* 创建批处理迭代器,如果使用hashtag则提供 tagged_keys */ batchedObjectIterator *it = createBatchedObjectIterator(NULL, usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, INT_MAX); /* 向批处理添加keys */ for (int i = 3; i < c->argc; i ++) { batchedObjectIteratorAddKey(c->db, it, c->argv[i]); } /* 添加一个空对象节点到复链表reply中,用于存放MultiBulk的长度 */ void *ptr = addDeferredMultiBulkLength(c); int total = 0; /* batched迭代 */ while (batchedObjectIteratorHasNext(it)) { /* batchedObjectIteratorNext返回本次迭代产生的SLOTSRESTORE系列命令的个数 */ total += batchedObjectIteratorNext(c, it); } /* 把真实的长度写进去 */ setDeferredMultiBulkLength(c, ptr, total); /* 释放批处理迭代器 */ freeBatchedObjectIterator(it); } /* * * SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] * */ void slotsmgrtOneAsyncDumpCommand(client *c) { if (c->argc <= 3) { addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC-DUMP"); return; } slotsmgrtAsyncDumpGenericCommand(c, 0); } /* * * SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] * */ void slotsmgrtTagOneAsyncDumpCommand(client *c) { if (c->argc <= 3) { addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC-DUMP"); return; } slotsmgrtAsyncDumpGenericCommand(c, 1); } /* ============================ Slotsmgrt{One,TagOne,Slot,TagSlot}AsyncCommand ============= */ /* 根据配置的client_obuf参数来修正maxbytes */ static unsigned int slotsmgrtAsyncMaxBufferLimit(unsigned int maxbytes) { clientBufferLimitsConfig *config = &server.client_obuf_limits[CLIENT_TYPE_NORMAL]; if (config->soft_limit_bytes != 0 && config->soft_limit_bytes < maxbytes) { /* 如果配置的大小比soft_limit_bytes大则使用soft_limit_bytes */ maxbytes = config->soft_limit_bytes; } if (config->hard_limit_bytes != 0 && config->hard_limit_bytes < maxbytes) { /* 如果配置的大小比hard_limit_bytes大则使用hard_limit_bytes */ maxbytes = config->hard_limit_bytes; } return maxbytes; } /* 在给定长时间usecs内至少产生atleast条消息(一条消息代表一条SLOTSRESTORE命令) */ static long slotsmgrtAsyncNextMessagesMicroseconds(slotsmgrtAsyncClient *ac, long atleast, long long usecs) { /* 批处理迭代 */ batchedObjectIterator *it = ac->batched_iter; /* 阶段截止时间 */ long long deadline = ustime() + usecs; long msgs = 0; /* 如果批处理还有对象需要迭代切客户端输出缓冲区使用字节数小于maxbytes */ while (batchedObjectIteratorHasNext(it) && getClientOutputBufferMemoryUsage(ac->c) < it->maxbytes) { /* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */ if ((msgs += batchedObjectIteratorNext(ac->c, it)) < atleast) { continue; } /* 如果已经超时就返回 */ if (ustime() >= deadline) { return msgs; } } /* 返回消息的个数 */ return msgs; } /* hash_slot的扫描函数 */ static void slotsScanSdsKeyCallback(void *l, const dictEntry *de) { sds skey = dictGetKey(de); robj *key = createStringObject(skey, sdslen(skey)); /* 将key添加都链表 */ listAddNodeTail((list *)l, key); } /* SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */ /* SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */ /* SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */ /* SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */ static void slotsmgrtAsyncGenericCommand(client *c, int usetag, int usekey) { /* 提取host和port */ char *host = c->argv[1]->ptr; long long port; if (getLongLongFromObject(c->argv[2], &port) != C_OK || !(port >= 1 && port < 65536)) { addReplyErrorFormat(c, "invalid value of port (%s)", (char *)c->argv[2]->ptr); return; } /* 提取timeout,用于chunk迁移时的临时ttl */ long long timeout; if (getLongLongFromObject(c->argv[3], &timeout) != C_OK || !(timeout >= 0 && timeout <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of timeout (%s)", (char *)c->argv[3]->ptr); return; } /* 默认30S */ if (timeout == 0) { timeout = 1000 * 30; } /* 提取maxbulks,用于觉得每个chunk能鞋底的bulk数目 */ long long maxbulks; if (getLongLongFromObject(c->argv[4], &maxbulks) != C_OK || !(maxbulks >= 0 && maxbulks <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbulks (%s)", (char *)c->argv[4]->ptr); return; } if (maxbulks == 0) { maxbulks = 200; } /* 最大512K */ if (maxbulks > 512 * 1024) { maxbulks = 512 * 1024; } /* 提取 maxbytes,用于决定单词迁移发送的最大字节数 */ long long maxbytes; if (getLongLongFromObject(c->argv[5], &maxbytes) != C_OK || !(maxbytes >= 0 && maxbytes <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbytes (%s)", (char *)c->argv[5]->ptr); return; } if (maxbytes == 0) { maxbytes = 512 * 1024; } if (maxbytes > INT_MAX / 2) { maxbytes = INT_MAX / 2; } /* 根据客户端配置的outbuf大小修正maxbytes */ maxbytes = slotsmgrtAsyncMaxBufferLimit(maxbytes); dict *hash_slot = NULL; long long numkeys = 0; if (!usekey) { /* 不是SLOTSMGRTTAGONE-ASYNC和SLOTSMGRTONE-ASYNC,即不指定key迁移 则提取slotnum */ long long slotnum; if (getLongLongFromObject(c->argv[6], &slotnum) != C_OK || !(slotnum >= 0 && slotnum < HASH_SLOTS_SIZE)) { addReplyErrorFormat(c, "invalid value of slot (%s)", (char *)c->argv[6]->ptr); return; } /* 获取hash_slot字典 */ hash_slot = c->db->hash_slots[slotnum]; /* 提取numkeys */ if (getLongLongFromObject(c->argv[7], &numkeys) != C_OK || !(numkeys >= 0 && numkeys <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of numkeys (%s)", (char *)c->argv[7]->ptr); return; } /* 如果numkeys为0就默认为每次迁移100 */ if (numkeys == 0) { numkeys = 100; } } /* DB是否正处于迁移状态 */ if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) { addReplyError(c, "the specified DB is being migrated"); return; } /* 带有CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志的客户端是一个被阻塞正在等待操作结束的客户端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { addReplyError(c, "previous operation has not finished"); return; } /* 获取或创建一个slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getOrCreateSlotsmgrtAsyncClient(c->db->id, host, port, timeout); if (ac == NULL) { addReplyErrorFormat(c, "create client to %s:%d failed", host, (int)port); return; } /* 创建批处理迭代器 */ batchedObjectIterator *it = createBatchedObjectIterator(hash_slot, usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, maxbytes); if (!usekey) { /* 创建一个链表ll,用于存放从hash_slot扫描出来的数据 */ list *ll = listCreate(); listSetFreeMethod(ll, decrRefCountVoid); for (int i = 2; i >= 0 && it->estimate_msgs < numkeys; i --) { unsigned long cursor = 0; if (i != 0) { cursor = random(); } else { if (htNeedsResize(hash_slot)) { dictResize(hash_slot); } } if (dictIsRehashing(hash_slot)) { dictRehash(hash_slot, 50); } int loop = numkeys * 10; if (loop < 100) { loop = 100; } do { /* slotsScanSdsKeyCallback里面会把扫描出来的key添加都ll中 */ cursor = dictScan(hash_slot, cursor, slotsScanSdsKeyCallback, ll); while (listLength(ll) != 0 && it->estimate_msgs < numkeys) { listNode *head = listFirst(ll); robj *key = listNodeValue(head); long msgs = estimateNumberOfRestoreCommands(c->db, key, it->maxbulks); if (it->estimate_msgs == 0 || it->estimate_msgs + msgs <= numkeys * 2) { batchedObjectIteratorAddKey(c->db, it, key); } listDelNode(ll, head); } /* */ } while (cursor != 0 && it->estimate_msgs < numkeys && dictSize(it->keys) < (unsigned long)numkeys && (-- loop) >= 0); } listRelease(ll); } else { /* 否则就是指定key的迁移 */ for (int i = 6; i < c->argc; i ++) { batchedObjectIteratorAddKey(c->db, it, c->argv[i]); } } /* 当前没有正在发送的消息 */ serverAssert(ac->sending_msgs == 0); /* 客户端阻塞链表也为空 */ serverAssert(ac->batched_iter == NULL && listLength(ac->blocked_list) == 0); ac->timeout = timeout; /* 更新最后使用时间 */ ac->lastuse = mstime(); ac->batched_iter = it; /* 在500ms内至少产生3条命令 */ ac->sending_msgs = slotsmgrtAsyncNextMessagesMicroseconds(ac, 3, 500); /* 判断db是否在迁移状态,如果是则阻塞 */ getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1); if (ac->sending_msgs != 0) { return; } notifySlotsmgrtAsyncClient(ac, NULL); ac->batched_iter = NULL; freeBatchedObjectIterator(it); } /* * * SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] * */ void slotsmgrtOneAsyncCommand(client *c) { if (c->argc <= 6) { addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 0, 1); } /* * * SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] * */ void slotsmgrtTagOneAsyncCommand(client *c) { if (c->argc <= 6) { addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 1, 1); } /* * * SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys * */ void slotsmgrtSlotAsyncCommand(client *c) { if (c->argc != 8) { addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 0, 0); } /* * * SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys * */ void slotsmgrtTagSlotAsyncCommand(client *c) { if (c->argc != 8) { addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 1, 0); } /* * * SLOTSMGRT-ASYNC-FENCE * */ void slotsmgrtAsyncFenceCommand(client *c) { /* 获取异步迁移状态或者阻塞一个client */ int ret = getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1); if (ret == 0) { /* 没有阻塞,说明当前没有迁移任务 */ addReply(c, shared.ok); } else if (ret != 1) { /* 正常情况下如果客户端成功阻塞,会返回1 */ addReplyError(c, "previous operation has not finished (call fence again)"); } /* 返回1的情况下,客户端暂时不会受到任何返回,后续迁移完成后会收到最终通知 */ } /* * * SLOTSMGRT-ASYNC-CANCEL * */ void slotsmgrtAsyncCancelCommand(client *c) { addReplyLongLong(c, releaseSlotsmgrtAsyncClient(c->db->id, "interrupted: canceled")); } /* ============================ SlotsmgrtAsyncStatus ======================================= */ static void singleObjectIteratorStatus(client *c, singleObjectIterator *it) { if (it == NULL) { addReply(c, shared.nullmultibulk); return; } void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "key"); addReplyBulk(c, it->key); fields ++; addReplyBulkCString(c, "val.type"); addReplyBulkLongLong(c, it->val == NULL ? -1 : it->val->type); fields ++; addReplyBulkCString(c, "stage"); addReplyBulkLongLong(c, it->stage); fields ++; addReplyBulkCString(c, "expire"); addReplyBulkLongLong(c, it->expire); fields ++; addReplyBulkCString(c, "cursor"); addReplyBulkLongLong(c, it->cursor); fields ++; addReplyBulkCString(c, "lindex"); addReplyBulkLongLong(c, it->lindex); fields ++; addReplyBulkCString(c, "zindex"); addReplyBulkLongLong(c, it->zindex); fields ++; addReplyBulkCString(c, "chunked_msgs"); addReplyBulkLongLong(c, it->chunked_msgs); setDeferredMultiBulkLength(c, ptr, fields * 2); } /* batchedObjectIterator的状态 */ static void batchedObjectIteratorStatus(client *c, batchedObjectIterator *it) { if (it == NULL) { addReply(c, shared.nullmultibulk); return; } void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "keys"); addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c, dictSize(it->keys)); addReplyMultiBulkLen(c, dictSize(it->keys)); dictIterator *di = dictGetIterator(it->keys); dictEntry *de; while((de = dictNext(di)) != NULL) { addReplyBulk(c, dictGetKey(de)); } dictReleaseIterator(di); fields ++; addReplyBulkCString(c, "timeout"); addReplyBulkLongLong(c, it->timeout); fields ++; addReplyBulkCString(c, "maxbulks"); addReplyBulkLongLong(c, it->maxbulks); fields ++; addReplyBulkCString(c, "maxbytes"); addReplyBulkLongLong(c, it->maxbytes); fields ++; addReplyBulkCString(c, "estimate_msgs"); addReplyBulkLongLong(c, it->estimate_msgs); fields ++; addReplyBulkCString(c, "removed_keys"); addReplyBulkLongLong(c, listLength(it->removed_keys)); fields ++; addReplyBulkCString(c, "chunked_vals"); addReplyBulkLongLong(c, listLength(it->chunked_vals)); fields ++; addReplyBulkCString(c, "iterators"); addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c, listLength(it->list)); singleObjectIterator *sp = NULL; if (listLength(it->list) != 0) { sp = listNodeValue(listFirst(it->list)); } singleObjectIteratorStatus(c, sp); setDeferredMultiBulkLength(c, ptr, fields * 2); } /* * * SLOTSMGRT-ASYNC-STATUS * */ void slotsmgrtAsyncStatusCommand(client *c) { /* */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == NULL) { addReply(c, shared.nullmultibulk); return; } /* 预留MultiBulk长度 */ void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "host"); addReplyBulkCString(c, ac->host); fields ++; addReplyBulkCString(c, "port"); addReplyBulkLongLong(c, ac->port); fields ++; addReplyBulkCString(c, "used"); addReplyBulkLongLong(c, ac->used); fields ++; addReplyBulkCString(c, "timeout"); addReplyBulkLongLong(c, ac->timeout); fields ++; addReplyBulkCString(c, "lastuse"); addReplyBulkLongLong(c, ac->lastuse); fields ++; addReplyBulkCString(c, "since_lastuse"); addReplyBulkLongLong(c, mstime() - ac->lastuse); fields ++; addReplyBulkCString(c, "sending_msgs"); addReplyBulkLongLong(c, ac->sending_msgs); /* 被阻塞的客户端的个数 */ fields ++; addReplyBulkCString(c, "blocked_clients"); addReplyBulkLongLong(c, listLength(ac->blocked_list)); fields ++; addReplyBulkCString(c, "batched_iterator"); batchedObjectIteratorStatus(c, ac->batched_iter); /* 设置MultiBulk长度 */ setDeferredMultiBulkLength(c, ptr, fields * 2); } /* ============================ SlotsmgrtExecWrapper ======================================= */ /* * * SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$arg1 ...] * */ void slotsmgrtExecWrapperCommand(client *c) { /* MultiBulk长度2 */ addReplyMultiBulkLen(c, 2); if (c->argc < 3) { addReplyLongLong(c, -1); addReplyError(c, "wrong number of arguments for SLOTSMGRT-EXEC-WRAPPER"); return; } /* 查找命令 */ struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); if (cmd == NULL) { addReplyLongLong(c, -1); addReplyErrorFormat(c,"invalid command specified (%s)", (char *)c->argv[2]->ptr); return; } if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) { addReplyLongLong(c, -1); addReplyErrorFormat(c, "wrong number of arguments for command (%s)", (char *)c->argv[2]->ptr); return; } /* 写的方式查找key */ if (lookupKeyWrite(c->db, c->argv[1]) == NULL) { addReplyLongLong(c, 0); addReplyError(c, "the specified key doesn't exist"); return; } /* 如果是写命令且 c->argv[1]正处于迁移状态,不会阻塞客户端 */ if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) { /* 返回1 */ addReplyLongLong(c, 1); addReplyError(c, "the specified key is being migrated"); return; } else { /* 返回2表示正常 */ addReplyLongLong(c, 2); robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2)); for (int i = 2; i < c->argc; i ++) { argv[i - 2] = c->argv[i]; incrRefCount(c->argv[i]); } /* 被重复引用计数的要减去 */ for (int i = 0; i < c->argc; i ++) { decrRefCount(c->argv[i]); } zfree(c->argv); c->argc = c->argc - 2; c->argv = argv; c->cmd = cmd; /* 调用被包装的命令 */ call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE); } } /* ============================ SlotsrestoreAsync Commands ================================= */ /* SLOTSRESTORE-ASYNC的回复 */ static void slotsrestoreReplyAck(client *c, int err_code, const char *fmt, ...) { va_list ap; va_start(ap, fmt); sds s = sdscatvprintf(sdsempty(), fmt, ap); va_end(ap); addReplyMultiBulkLen(c, 3); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-ACK"); addReplyBulkLongLong(c, err_code); addReplyBulkSds(c, s); if (err_code != 0) { /* 如果有错误则回复之后关闭客户端 */ c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } extern int verifyDumpPayload(unsigned char *p, size_t len); /* slotsrestore-async命令具体处理 */ static int slotsrestoreAsyncHandle(client *c) { /* 获取本节点上异步迁移状态,即使在迁移也不会阻塞这个client */ if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) { /* 本节点当前db上正在执行迁移,不能响应slotsrestore-async命令 */ slotsrestoreReplyAck(c, -1, "the specified DB is being migrated"); return C_ERR; } const char *cmd = ""; /* 参数校验 */ if (c->argc < 2) { goto bad_arguments_number; } cmd = c->argv[1]->ptr; /* ==================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key [$ttl $arg1, $arg2 ...] */ /* ==================================================== */ if (c->argc < 3) { goto bad_arguments_number; } robj *key = c->argv[2]; /* SLOTSRESTORE-ASYNC delete $key */ if (!strcasecmp(cmd, "delete")) { if (c->argc != 3) { goto bad_arguments_number; } /* 同步删除 */ int deleted = dbDelete(c->db, key); if (deleted) { /* 删除成功,通知所有watch该key的client */ signalModifiedKey(c->db, key); /* 脏计数 */ server.dirty ++; } /* 回复,成删除回复1,没有删除则返回0 */ slotsrestoreReplyAck(c, 0, deleted ? "1" : "0"); return C_OK; } /* ==================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key $ttl [$arg1, $arg2 ...] */ /* ==================================================== */ if (c->argc < 4) { goto bad_arguments_number; } /* 提取ttl */ long long ttl; if (getLongLongFromObject(c->argv[3], &ttl) != C_OK || ttl < 0) { slotsrestoreReplyAck(c, -1, "invalid TTL value (TTL=%s)", c->argv[3]->ptr); return C_ERR; } /* SLOTSRESTORE-ASYNC expire $key $ttl */ if (!strcasecmp(cmd, "expire")) { /* 参数校验 */ if (c->argc != 4) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) == NULL) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 响应 */ slotsrestoreReplyAck(c, 0, "1"); /* 会执过期设置 */ goto success_common; } /* SLOTSRESTORE-ASYNC string $key $ttl $payload */ if (!strcasecmp(cmd, "string")) { /* 参数校验 */ if (c->argc != 5) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) != NULL) { slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr); return C_ERR; } /* 对val编码 */ robj *val = c->argv[4] = tryObjectEncoding(c->argv[4]); /* 添加到db */ dbAdd(c->db, key, val); /* 引用计数 */ incrRefCount(val); /* 响应 */ slotsrestoreReplyAck(c, 0, "1"); /* 会执过期设置 */ goto success_common; } /* SLOTSRESTORE-ASYNC object $key $ttl $payload */ if (!strcasecmp(cmd, "object")) { /* 参数校验 */ if (c->argc != 5) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) != NULL) { slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr); return C_ERR; } void *bytes = c->argv[4]->ptr; rio payload; /* 校验RDB序列化格式 */ if (verifyDumpPayload(bytes, sdslen(bytes)) != C_OK) { slotsrestoreReplyAck(c, -1, "invalid payload checksum"); return C_ERR; } /* 初始化payload */ rioInitWithBuffer(&payload, bytes); /* 获取对象类型 */ int type = rdbLoadObjectType(&payload); if (type == -1) { slotsrestoreReplyAck(c, -1, "invalid payload type"); return C_ERR; } /* 获取值对象 */ robj *val = rdbLoadObject(type, &payload); if (val == NULL) { slotsrestoreReplyAck(c, -1, "invalid payload body"); return C_ERR; } /* 添加到db */ dbAdd(c->db, key, val); /* 响应 */ slotsrestoreReplyAck(c, 0, "1"); /* 会执过期设置 */ goto success_common; } /* ========================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key $ttl $hint [$arg1, $arg2 ...] */ /* ========================================================== */ /* 参数校验 */ if (c->argc < 5) { goto bad_arguments_number; } /* 提取总长度hint */ long long hint; if (getLongLongFromObject(c->argv[4], &hint) != C_OK || hint < 0) { slotsrestoreReplyAck(c, -1, "invalid Hint value (Hint=%s)", c->argv[4]->ptr); return C_ERR; } int xargc = c->argc - 5; robj **xargv = &c->argv[5]; /* SLOTSRESTORE-ASYNC list $key $ttl $hint [$elem1 ...] */ if (!strcasecmp(cmd, "list")) { /* 查看key是否存在 */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* 如果key已经存在,则val类型必须为OBJ_LIST切编码类型必须为OBJ_ENCODING_QUICKLIST */ if (val->type != OBJ_LIST || val->encoding != OBJ_ENCODING_QUICKLIST) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_LIST, OBJ_ENCODING_QUICKLIST, val->type, val->encoding); return C_ERR; } } else { /* 否则key不存在 */ if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 常见Quicklist对象 */ val = createQuicklistObject(); /* 设置选项 */ quicklistSetOptions(val->ptr, server.list_max_ziplist_size, server.list_compress_depth); /* 添加到db */ dbAdd(c->db, key, val); } /* 将所有的args添加到val Quicklist中 */ for (int i = 0; i < xargc; i ++) { xargv[i] = tryObjectEncoding(xargv[i]); listTypePush(val, xargv[i], LIST_TAIL); } /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", listTypeLength(val)); goto success_common; } /* SLOTSRESTORE-ASYNC hash $key $ttl $hint [$hkey1 $hval1 ...] */ if (!strcasecmp(cmd, "hash")) { /* 对于hash类型args必须是偶数 */ if (xargc % 2 != 0) { goto bad_arguments_number; } /* 先查找key */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* key已存在,则类型必须为OBJ_HASH,编码类型必须为OBJ_ENCODING_HT */ if (val->type != OBJ_HASH || val->encoding != OBJ_ENCODING_HT) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_HASH, OBJ_ENCODING_HT, val->type, val->encoding); return C_ERR; } } else { if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不存在就创建hash对象 */ val = createHashObject(); if (val->encoding != OBJ_ENCODING_HT) { hashTypeConvert(val, OBJ_ENCODING_HT); } /* 添加到db */ dbAdd(c->db, key, val); } /* 如果总长度不为0 */ if (hint != 0) { dict *ht = val->ptr; /* 使用hint创建或者扩展ht */ dictExpand(ht, hint); } /* 顺序添加 */ for (int i = 0; i < xargc; i += 2) { /* field */ hashTypeTryObjectEncoding(val, &xargv[i], &xargv[i + 1]); /* value */ hashTypeSet(val, xargv[i], xargv[i + 1]); } /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", hashTypeLength(val)); goto success_common; } /* SLOTSRESTORE-ASYNC dict $key $ttl $hint [$elem1 ...] */ if (!strcasecmp(cmd, "dict")) { /* 先查找key */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* key已存在,则类型必须为OBJ_SET,编码类型必须为OBJ_ENCODING_HT */ if (val->type != OBJ_SET || val->encoding != OBJ_ENCODING_HT) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_SET, OBJ_ENCODING_HT, val->type, val->encoding); return C_ERR; } } else { if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不存在就创建set对象 */ val = createSetObject(); if (val->encoding != OBJ_ENCODING_HT) { setTypeConvert(val, OBJ_ENCODING_HT); } /* 添加到db */ dbAdd(c->db, key, val); } /* 如果总长度不为0 */ if (hint != 0) { dict *ht = val->ptr; /* 使用hint创建或者扩展ht */ dictExpand(ht, hint); } /* 顺序添加 */ for (int i = 0; i < xargc; i ++) { /* feild */ xargv[i] = tryObjectEncoding(xargv[i]); /* val */ setTypeAdd(val, xargv[i]); } /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", setTypeSize(val)); goto success_common; } /* SLOTSRESTORE-ASYNC zset $key $ttl $hint [$elem1 $score1 ...] */ if (!strcasecmp(cmd, "zset")) { /* zset参数也必须是偶数,elem1和score配对 */ if (xargc % 2 != 0) { goto bad_arguments_number; } /* 提取score */ double *scores = zmalloc(sizeof(double) * xargc / 2); for (int i = 1, j = 0; i < xargc; i += 2, j ++) { uint64_t bits; if (getUint64FromRawStringObject(xargv[i], &bits) != C_OK) { zfree(scores); slotsrestoreReplyAck(c, -1, "invalid zset score ([%d]), bad raw bits", j); return C_ERR; } scores[j] = convertRawBitsToDouble(bits); } /* */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* val已经存在,校验类型 */ if (val->type != OBJ_ZSET || val->encoding != OBJ_ENCODING_SKIPLIST) { zfree(scores); slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_ZSET, OBJ_ENCODING_SKIPLIST, val->type, val->encoding); return C_ERR; } } else { /* 不存在 */ if (xargc == 0) { zfree(scores); slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 否则就创建zset对象 */ val = createZsetObject(); if (val->encoding != OBJ_ENCODING_SKIPLIST) { zsetConvert(val, OBJ_ENCODING_SKIPLIST); } /* 添加到db */ dbAdd(c->db, key, val); } zset *zset = val->ptr; /* 如果总长度不为0 */ if (hint != 0) { dict *ht = zset->dict; /* 就创建或修正hash大小为hint */ dictExpand(ht, hint); } /* 顺序添加 */ for (int i = 0, j = 0; i < xargc; i += 2, j ++) { robj *elem = xargv[i] = tryObjectEncoding(xargv[i]); dictEntry *de = dictFind(zset->dict, elem); if (de != NULL) { /* score */ double score = *(double *)dictGetVal(de); zslDelete(zset->zsl, score, elem); /* memeber */ dictDelete(zset->dict, elem); } /* 添加elem */ zskiplistNode *znode = zslInsert(zset->zsl, scores[j], elem); /* 引用计数 */ incrRefCount(elem); /* 添加score */ dictAdd(zset->dict, elem, &(znode->score)); /* 引用计数 */ incrRefCount(elem); } zfree(scores); /* 返回值为val当前总长度 */ slotsrestoreReplyAck(c, 0, "%d", zsetLength(val)); goto success_common; } slotsrestoreReplyAck(c, -1, "unknown command (argc=%d,cmd=%s)", c->argc, cmd); return C_ERR; bad_arguments_number: slotsrestoreReplyAck(c, -1, "wrong number of arguments (argc=%d,cmd=%s)", c->argc, cmd); return C_ERR; success_common: /* ttl不为0就设置过期,否则就删除过期设置 */ if (ttl != 0) { setExpire(c->db, key, mstime() + ttl); } else { removeExpire(c->db, key); } /* 通知watched */ signalModifiedKey(c->db, key); /* 脏计数 */ server.dirty ++; return C_OK; } /* * * SLOTSRESTORE-ASYNC delete $key * expire $key $ttl * object $key $ttl $payload * string $key $ttl $payload * list $key $ttl $hint [$elem1 ...] * hash $key $ttl $hint [$hkey1 $hval1 ...] * dict $key $ttl $hint [$elem1 ...] * zset $key $ttl $hint [$elem1 $score1 ...] * */ void slotsrestoreAsyncCommand(client *c) { /* slotsrestore-async命令处理 */ if (slotsrestoreAsyncHandle(c) != C_OK) { c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } /* 目的实例发送SLOTSRESTORE-ASYNC-ACK的处理 */ static int slotsrestoreAsyncAckHandle(client *c) { /* 获取该db上对应的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c != c) { /* 必须是同一个客户端发送的SLOTSRESTORE-ASYNC-ACK才合法 */ addReplyErrorFormat(c, "invalid client, permission denied"); return C_ERR; } /* 参数校验,格式:SLOTSRESTORE-ASYNC-ACK $errno $message */ if (c->argc != 3) { addReplyError(c, "wrong number of arguments for SLOTSRESTORE-ASYNC-ACK"); return C_ERR; } long long errcode; if (getLongLongFromObject(c->argv[1], &errcode) != C_OK) { addReplyErrorFormat(c, "invalid errcode (%s)", (char *)c->argv[1]->ptr); return C_ERR; } /* 如果有错误这个就是错误的描述信息 */ const char *errmsg = c->argv[2]->ptr; if (errcode != 0) { /* 错误码不为0则打印错误 */ serverLog(LL_WARNING, "slotsmgrt_async: ack[%d] %s", (int)errcode, errmsg != NULL ? errmsg : "(null)"); return C_ERR; } /* batched_iter校验,理论上在有迁移状态下不能为NULL */ if (ac->batched_iter == NULL) { serverLog(LL_WARNING, "slotsmgrt_async: null batched iterator"); addReplyError(c, "invalid iterator (NULL)"); return C_ERR; } /* 正在发送的消息个数(飞行中的消息) */ if (ac->sending_msgs == 0) { serverLog(LL_WARNING, "slotsmgrt_async: invalid message counter"); addReplyError(c, "invalid pending messages"); return C_ERR; } /* 更新slotsmgrtAsyncClient最后一次被使用的时间 */ ac->lastuse = mstime(); /* 飞行中的消息个数减一(即一条restore命令收到了一个ack) */ ac->sending_msgs -= 1; /* 继续产生新的restore命令(在给定10ms内至少产生2条消息) */ ac->sending_msgs += slotsmgrtAsyncNextMessagesMicroseconds(ac, 2, 10); /* 如果还有正在发送的消息(即发出去还没收到ACK) */ if (ac->sending_msgs != 0) { return C_OK; } /* 通知客户端 */ notifySlotsmgrtAsyncClient(ac, NULL); /* 获取批处理迭代器 */ batchedObjectIterator *it = ac->batched_iter; if (listLength(it->removed_keys) != 0) { /* 如果被移走的key个数不为0 */ list *ll = it->removed_keys; for (int i = 0; i < c->argc; i ++) { /* 遍历removed_keys链表,对其引用计数减一 */ decrRefCount(c->argv[i]); } /* 释放客户端当前的参数结构 */ zfree(c->argv); /* DEL key1 key2 key2 ... */ c->argc = 1 + listLength(ll); /* 分配argv结构 */ c->argv = zmalloc(sizeof(robj *) * c->argc); for (int i = 1; i < c->argc; i ++) { /* 遍历、填充argv */ listNode *head = listFirst(ll); /* 获取被移走的key */ robj *key = listNodeValue(head); /* 将其从db中删除 */ if (dbDelete(c->db, key)) { /* 通知key空间 */ signalModifiedKey(c->db, key); /* 脏计数 */ server.dirty ++; } /* 填充argv */ c->argv[i] = key; /* 引用计数 */ incrRefCount(key); /* 删除当前节点 */ listDelNode(ll, head); } /* 填充 argv[0] */ c->argv[0] = createStringObject("DEL", 3); /* 注意,虽然客户端发来的是SLOTSRESTORE-ASYNC-ACK命令, 但是此时我们已经将其改写为一条DEL命令,该函数退出后,会被 propagate写到AOF文件和所有slaves */ } /* 用于存放使用chunked方式发生的val */ if (listLength(it->chunked_vals) != 0) { list *ll = it->chunked_vals; /* 遍历 chunked_vals链表 */ while (listLength(ll) != 0) { /* 头结点 */ listNode *head = listFirst(ll); /* 提取节点值 */ robj *o = listNodeValue(head); /* 引用计数 */ incrRefCount(o); /* 删除当前节点 */ listDelNode(ll, head); /* 如果当前对象refcount不为1就先decrRefCount */ if (o->refcount != 1) { decrRefCount(o); } else { /* 否则refcount为1就lazy释放 */ lazyReleaseObject(o); } } } ac->batched_iter = NULL; freeBatchedObjectIterator(it); return C_OK; } /* * * SLOTSRESTORE-ASYNC-ACK $errno $message * */ void slotsrestoreAsyncAckCommand(client *c) { /* 调用slotsrestoreAsyncAckHandle进一步处理 */ if (slotsrestoreAsyncAckHandle(c) != C_OK) { /* Close after writing entire reply. */ c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } extern int time_independent_strcmp(const char *a, const char *b); /* * * SLOTSRESTORE-ASYNC-AUTH $passwd * */ void slotsrestoreAsyncAuthCommand(client *c) { if (!server.requirepass) { /* 如果服务端没有设置密码则返回错误 */ slotsrestoreReplyAck(c, -1, "Client sent AUTH, but no password is set"); return; } if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { /* 密码匹配成功则设置客户端的authenticated标志,并响应ok */ c->authenticated = 1; slotsrestoreReplyAck(c, 0, "OK"); } else { /* 密码匹配失败 */ c->authenticated = 0; slotsrestoreReplyAck(c, -1, "invalid password"); } } /* * * SLOTSRESTORE-ASYNC-SELECT $db * */ void slotsrestoreAsyncSelectCommand(client *c) { long long db; if (getLongLongFromObject(c->argv[1], &db) != C_OK || !(db >= 0 && db <= INT_MAX) || selectDb(c, db) != C_OK) { slotsrestoreReplyAck(c, -1, "invalid DB index (%s)", c->argv[1]->ptr); } else { slotsrestoreReplyAck(c, 0, "OK"); } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spring Boot系列实战文章合集(附源码)
概 述 文章开始之前先感叹一番吧。个人从之前的 C语言项目开发转到 Java项目开发来之后开始学着用 Spring Boot做一些后端服务,不得不说 Spring Boot脚手架式的开发真的是十分便利,最近连掉头发现象也好了很多,于是从内心感叹 Java阵营程序员真的比 C阵营程序员工作起来舒服多了,原因就在于Java领域繁荣的生态圈催生了一大批诸如 Spring Boot这样优秀的框架的出现。 这段时间也陆陆续续记录了一些有关 Spring Boot应用层开发的点点滴滴,特在此汇聚成文章合集,并 放在了Github上,项目名为 Spring-Boot-In-Action,后续仍然会持续更新。 注: 本文首发于 My Personal Blog:CodeSheep·程序羊,欢迎光临 小站 数据库/缓存相关 Guava Cache本地缓存在 Spring Boot应用中的实践 EVCache缓存在 Spring Boot中的实战 Spring Boot应用缓存实践之:Ehcache加持 Spring Boot集成 MyBatis和 SQL Server实践 Elasticsearch搜索...
- 下一篇
Spring Cloud Alibaba基础教程:Nacos配置的多环境管理
前情回顾: 《Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现》 《Spring Cloud Alibaba基础教程:支持的几种服务消费方式》 《Spring Cloud Alibaba基础教程:使用Nacos作为配置中心》 《Spring Cloud Alibaba基础教程:Nacos配置的加载规则详解》 通过之前两篇对Nacos配置管理功能的介绍,已经学会了在Nacos中如何加入配置以及Spring Cloud应用如何通过配置来加载到对应的内容。接下来,我们讨论一个在使用配置中心时,都需要关注的一个问题:多环境的配置如何实现与管理? 多环境管理 在Nacos中,本身有多个不同管理级别的概念,包括:Data ID、Group、Namespace。只要利用好这些层级概念的关系,就可以根据自己的需要来实现多环境的管理。 下面,我就来介绍一下,可以使用的几种实现方式: 使用Data ID与profiles实现 Data ID在Nacos中,我们可以理解为就是一个Spring Cloud应用的配置文件名。通过上一篇《Spring Cloud Alibaba基...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Linux系统CentOS6、CentOS7手动修改IP地址