首页 文章 精选 留言 我的

精选列表

搜索[面试],共4915篇文章
优秀的个人博客,低调大师

面试题:在日常工作中怎么做MySQL优化的?

MySQL常见的优化手段分为下面几个方面: SQL优化、设计优化,硬件优化等,其中每个大的方向中又包含多个小的优化点 下面我们具体来看看 文章首发在公众号(月伴飞鱼),之后同步到掘金和个人网站:xiaoflyfish.cn/ 觉得有收获,希望帮忙点赞,转发下哈,谢谢,谢谢 SQL优化 此优化方案指的是通过优化 SQL 语句以及索引来提高 MySQL 数据库的运行效率,具体内容如下: 分页优化 例如: select * from table where type = 2 and level = 9 order by id asc limit 190289,10; 复制代码 优化方案: 延迟关联 先通过where条件提取出主键,在将该表与原数据表关联,通过主键id提取数据行,而不是通过原来的二级索引提取数据行 例如: select a.* from table a, (select id from table where type = 2 and level = 9 order by id asc limit 190289,10 ) b where a.id = b.id 复制代码 书签方式 书签方式说白了就是找到limit第一个参数对应的主键值,再根据这个主键值再去过滤并limit 例如: select * from table where id > (select * from table where type = 2 and level = 9 order by id asc limit 190289, 1) limit 10; 复制代码 索引优化 正确使用索引 假如我们没有添加索引,那么在查询时就会触发全表扫描,因此查询的数据就会很多,并且查询效率会很低,为了提高查询的性能,我们就需要给最常使用的查询字段上,添加相应的索引,这样才能提高查询的性能 建立覆盖索引 InnoDB使用辅助索引查询数据时会回表,但是如果索引的叶节点中已经包含要查询的字段,那它没有必要再回表查询了,这就叫覆盖索引 例如对于如下查询: select name from test where city='上海' 复制代码 我们将被查询的字段建立到联合索引中,这样查询结果就可以直接从索引中获取 alter table test add index idx_city_name (city, name); 复制代码 在 MySQL 5.0 之前的版本尽量避免使用or查询 在 MySQL 5.0 之前的版本要尽量避免使用 or 查询,可以使用 union 或者子查询来替代,因为早期的 MySQL 版本使用 or 查询可能会导致索引失效,在 MySQL 5.0 之后的版本中引入了索引合并 索引合并简单来说就是把多条件查询,比如or或and查询对多个索引分别进行条件扫描,然后将它们各自的结果进行合并,因此就不会导致索引失效的问题了 如果从Explain执行计划的type列的值是index_merge可以看出MySQL使用索引合并的方式来执行对表的查询 避免在 where 查询条件中使用 != 或者 <> 操作符 SQL中,不等于操作符会导致查询引擎放弃索引索引,引起全表扫描,即使比较的字段上有索引 解决方法:通过把不等于操作符改成or,可以使用索引,避免全表扫描 例如,把column<>’aaa’,改成column>’aaa’ or column<’aaa’ ,就可以使用索引了 适当使用前缀索引 MySQL 是支持前缀索引的,也就是说我们可以定义字符串的一部分来作为索引 我们知道索引越长占用的磁盘空间就越大,那么在相同数据页中能放下的索引值也就越少,这就意味着搜索索引需要的查询时间也就越长,进而查询的效率就会降低,所以我们可以适当的选择使用前缀索引,以减少空间的占用和提高查询效率 比如,邮箱的后缀都是固定的“@xxx.com”,那么类似这种后面几位为固定值的字段就非常适合定义为前缀索引 alter table test add index index2(email(6)); 复制代码 使用前缀索引,定义好长度,就可以做到既节省空间,又不用额外增加太多的查询成本 需要注意的是,前缀索引也存在缺点,MySQL无法利用前缀索引做order by和group by 操作,也无法作为覆盖索引 查询具体的字段而非全部字段 要尽量避免使用 select *,而是查询需要的字段,这样可以提升速度,以及减少网络传输的带宽压力 优化子查询 尽量使用 Join 语句来替代子查询,因为子查询是嵌套查询,而嵌套查询会新创建一张临时表,而临时表的创建与销毁会占用一定的系统资源以及花费一定的时间,同时对于返回结果集比较大的子查询,其对查询性能的影响更大 小表驱动大表 我们要尽量使用小表驱动大表的方式进行查询,也就是如果 B 表的数据小于 A 表的数据,那执行的顺序就是先查 B 表再查 A 表,具体查询语句如下: select name from A where id in (select id from B); 复制代码 不要在列上进行运算操作 不要在列字段上进行算术运算或其他表达式运算,否则可能会导致查询引擎无法正确使用索引,从而影响了查询的效率 select * from test where id + 1 = 50; select * from test where month(updateTime) = 7; 复制代码 一个很容易踩的坑:隐式类型转换: select * from test where skuId=123456 复制代码 skuId这个字段上有索引,但是explain的结果却显示这条语句会全表扫描 原因在于skuId的字符类型是varchar(32),比较值却是整型,故需要做类型转换 适当增加冗余字段 增加冗余字段可以减少大量的连表查询,因为多张表的连表查询性能很低,所有可以适当的增加冗余字段,以减少多张表的关联查询,这是以空间换时间的优化策略 正确使用联合索引 使用了 B+ 树的 MySQL 数据库引擎,比如 InnoDB 引擎,在每次查询复合字段时是从左往右匹配数据的,因此在创建联合索引的时候需要注意索引创建的顺序 例如,我们创建了一个联合索引是 idx(name,age,sex),那么当我们使用,姓名+年龄+性别、姓名+年龄、姓名等这种最左前缀查询条件时,就会触发联合索引进行查询;然而如果非最左匹配的查询条件,例如,性别+姓名这种查询条件就不会触发联合索引 Join优化 MySQL的join语句连接表使用的是nested-loop join算法,这个过程类似于嵌套循环,简单来说,就是遍历驱动表(外层表),每读出一行数据,取出连接字段到被驱动表(内层表)里查找满足条件的行,组成结果行 要提升join语句的性能,就要尽可能减少嵌套循环的循环次数 一个显著优化方式是对被驱动表的join字段建立索引,利用索引能快速匹配到对应的行,避免与内层表每一行记录做比较,极大地减少总循环次数。另一个优化点,就是连接时用小结果集驱动大结果集,在索引优化的基础上能进一步减少嵌套循环的次数 如果难以判断哪个是大表,哪个是小表,可以用inner join连接,MySQL会自动选择小表去驱动大表 避免使用JOIN关联太多的表 对于 MySQL 来说,是存在关联缓存的,缓存的大小可以由join_buffer_size参数进行设置 在 MySQL 中,对于同一个 SQL 多关联(join)一个表,就会多分配一个关联缓存,如果在一个 SQL 中关联的表越多,所占用的内存也就越大 如果程序中大量的使用了多表关联的操作,同时join_buffer_size设置的也不合理的情况下,就容易造成服务器内存溢出的情况,就会影响到服务器数据库性能的稳定性 排序优化 利用索引扫描做排序 MySQL有两种方式生成有序结果:其一是对结果集进行排序的操作,其二是按照索引顺序扫描得出的结果自然是有序的 但是如果索引不能覆盖查询所需列,就不得不每扫描一条记录回表查询一次,这个读操作是随机IO,通常会比顺序全表扫描还慢 因此,在设计索引时,尽可能使用同一个索引既满足排序又用于查找行 例如: --建立索引(date,staff_id,customer_id) select staff_id, customer_id from test where date = '2010-01-01' order by staff_id,customer_id; 复制代码 只有当索引的列顺序和ORDER BY子句的顺序完全一致,并且所有列的排序方向都一样时,才能够使用索引来对结果做排序 UNION优化 MySQL处理union的策略是先创建临时表,然后将各个查询结果填充到临时表中最后再来做查询,很多优化策略在union查询中都会失效,因为它无法利用索引 最好手工将where、limit等子句下推到union的各个子查询中,以便优化器可以充分利用这些条件进行优化 此外,除非确实需要服务器去重,一定要使用union all,如果不加all关键字,MySQL会给临时表加上distinct选项,这会导致对整个临时表做唯一性检查,代价很高 慢查询日志 出现慢查询通常的排查手段是先使用慢查询日志功能,查询出比较慢的 SQL 语句,然后再通过 Explain 来查询 SQL 语句的执行计划,最后分析并定位出问题的根源,再进行处理 慢查询日志指的是在 MySQL 中可以通过配置来开启慢查询日志的记录功能,超过long_query_time值的 SQL 将会被记录在日志中 我们可以通过设置“slow_query_log=1”来开启慢查询 需要注意的是,在开启慢日志功能之后,会对 MySQL 的性能造成一定的影响,因此在生产环境中要慎用此功能 设计优化 尽量避免使用NULL NULL在MySQL中不好处理,存储需要额外空间,运算也需要特殊的运算符,含有NULL的列很难进行查询优化 应当指定列为not null,用0、空串或其他特殊的值代替空值,比如定义为int not null default 0 最小数据长度 越小的数据类型长度通常在磁盘、内存和CPU缓存中都需要更少的空间,处理起来更快 使用最简单数据类型 简单的数据类型操作代价更低,比如:能使用 int 类型就不要使用 varchar 类型,因为 int 类型比 varchar 类型的查询效率更高 尽量少定义 text 类型 text 类型的查询效率很低,如果必须要使用 text 定义字段,可以把此字段分离成子表,需要查询此字段时使用联合查询,这样可以提高主表的查询效率 适当分表、分库策略 分表是指当一张表中的字段更多时,可以尝试将一张大表拆分为多张子表,把使用比较高频的主信息放入主表中,其他的放入子表,这样我们大部分查询只需要查询字段更少的主表就可以完成了,从而有效的提高了查询的效率 分库是指将一个数据库分为多个数据库。比如我们把一个数据库拆分为了多个数据库,一个主数据库用于写入和修改数据,其他的用于同步主数据并提供给客户端查询,这样就把一个库的读和写的压力,分摊给了多个库,从而提高了数据库整体的运行效率 常见类型选择 整数类型宽度设置 MySQL可以为整数类型指定宽度,例如int(11),实际上并没有意义,它并不会限制值的范围,对于存储和计算来说,int(1)和int(20)是相同的 VARCHAR和CHAR类型 char类型是定长的,而varchar存储可变字符串,比定长更省空间,但是varchar需要额外1或2个字节记录字符串长度,更新时也容易产生碎片 需要结合使用场景来选择:如果字符串列最大长度比平均长度大很多,或者列的更新很少,选择varchar较合适;如果要存很短的字符串,或者字符串值长度都相同,比如MD5值,或者列数据经常变更,选择使用char类型 DATETIME和TIMESTAMP类型 datetime的范围更大,能表示从1001到9999年,timestamp只能表示从1970年到2038年。datetime与时区无关,timestamp显示值依赖于时区。在大多数场景下,这两种类型都能良好地工作,但是建议使用timestamp,因为datetime占用8个字节,timestamp只占用了4个字节,timestamp空间效率更高 BLOB和TEXT类型 blob和text都是为存储很大数据而设计的字符串数据类型,分别采用二进制和字符方式存储 在实际使用中,要慎用这两种类型,它们的查询效率很低,如果字段必须要使用这两种类型,可以把此字段分离成子表,需要查询此字段时使用联合查询,这样可以提高主表的查询效率 范式化 当数据较好范式化时,修改的数据更少,而且范式化的表通常要小,可以有更多的数据缓存在内存中,所以执行操作会更快 缺点则是查询时需要更多的关联 第一范式:字段不可分割,数据库默认支持 第二范式:消除对主键的部分依赖,可以在表中加上一个与业务逻辑无关的字段作为主键,比如用自增id 第三范式:消除对主键的传递依赖,可以将表拆分,减少数据冗余 硬件优化 MySQL 对硬件的要求主要体现在三个方面:磁盘、网络和内存 磁盘 磁盘应该尽量使用有高性能读写能力的磁盘,比如固态硬盘,这样就可以减少 I/O 运行的时间,从而提高了 MySQL 整体的运行效率 磁盘也可以尽量使用多个小磁盘而不是一个大磁盘,因为磁盘的转速是固定的,有多个小磁盘就相当于拥有多个并行运行的磁盘一样 网络 保证网络带宽的通畅(低延迟)以及够大的网络带宽是 MySQL 正常运行的基本条件,如果条件允许的话也可以设置多个网卡,以提高网络高峰期 MySQL 服务器的运行效率 内存 MySQL 服务器的内存越大,那么存储和缓存的信息也就越多,而内存的性能是非常高的,从而提高了整个 MySQL 的运行效率 如果以上文章对您有帮助,请给我们的开源项目点点star:http://github.crmeb.net/u/defu不胜感激! 来自 “开源世界 ” ,链接:https://ym.baisou.ltd/post/726.html,如需转载,请注明出处,否则将追究法律责任。

优秀的个人博客,低调大师

面试官还能怎么说.

女:你一点都不注重细节;男:你说我不都细节?? 你看看下面的红黑树讲解,,你说我不够细节??随后男发送下面红黑树代码发给女朋友之后;女:哇,哥哥果然好细;男:细节拉满,关注又是一个细节;当然也不能太细,这样也不太好; 代码+图解 红黑树的性质: 首先记住它的性质: 在增删节点时需要调整满足性质 根节点是黑色的; 如果一个节点是红色的,则它的两个孩子结点是黑色的; 对于每个结点,从该结点到其所有后代叶结点的简单路径上,均包含相同数目的黑色结点; 每个叶子结点都是黑色的(此处的叶子结点指的是空结点); 对颜色进行定义: //我们用枚举表示表示出两种颜色 //枚举默认第一个参数 BLACK 为 0 ,第二个参数 RED 为 1; enum COLOR { BLACK, RED }; 红黑树的节点表示: 看图说节点红黑树的节点参数, 一个指向上面的节点定义为_parent一个指向左边的节点的节点指针 _left一个指向右边的 _right一个pair类型的值一个颜色的识别 当然这么写参数那得来个构造函数了,“细节”。 template<class K,class V> struct RBNode { //各个节点 RBNode<K, V>* _parent; RBNode<K, V>* _left; RBNode<K, V>* _right; //key-value pair<K, V> _kv; //颜色 COLOR _color; RBNode(const pair<K, V>& kv = pair<K, V>()) :_parent(nullptr) , _left(nullptr) , _right(nullptr) , _kv(kv) //新申请的节点定义为RED,黑色的加入,会导致其他所有路径的黑色都遭到破坏; , _color(RED) { } }; 对树的类进行定义: 红黑树的头节点——header 指向如图上图:具体下面再写 template<class K,class V> class RBTree { public: //类型定义,用Node表示RBNode<K,V> typedef RBNode<K,V> Node; RBTree() :_header(new Node) { //创建空树 _header->_left = _header->_right = _header; } private: Node* _header; }; 红黑树的插入: 当插入c节点时,为了满足性质,需要对节点进行调整,为了保证调整最优;如果插入节点的父节点(u)和叔叔节点(u)为红色,那么只需要改变叔叔节点(u)和父节点(p)颜色为黑色,祖父节(g)为黑色即可;当然如果祖父节点(g) 为根节点,再把其变为黑色;不是根节点,向上调整; 结构右旋: 写一个左旋的操作,在树结构变化时可高效的改变保证结构性质;如果当出现情况1:插入节点为c,当它的叔叔节点为黑色,或者叔叔节点不存在,操作相同; 记录三个节点,只有这三个节点指向变化; 由图: 用parent表示g, 用subL表示p, 用subLR表示p的右子节点即c; 用pparent表示g的父节点; // parent // subL // subLR void RotateR(Node* parent) { Node* subL = parent->_left; Node* subLR = subL->_right; //改变节点指向: subL->_right = parent; parent->_left = subLR; //如果存在父节点存在,如果为nullptr则没有父节点; if (subLR) subLR->_parent = parent; //判断根 if (parent == _header->_parent) { _header->_parent = subL; subL->_parent = _header; } //不是根 else { Node* pparent = parent->_parent; subL->_parent = pparent; if (pparent->_left == parent) pparent->_left = subL; else pparent->_right = subL; } //先要将parent的父节点赋值出去,才能改变paremt的父节点; parent->_parent = subL; } 结构左旋: :如图顺序(未画完全,子节点仍然有且满足性质) 记录三个节点,只有这三个节点指向变化;(操作和右旋思路相同) 由图: 用pparent表示p的父节点; // parent // subR // subRL void RotateL(Node* parent) { Node* subR = parent->_right; Node* subRL = subR->_left; subR->_left = parent; parent->_right = subRL; if (subRL) { subRL->_parent = parent; } if (_header->_parent == parent) { _header->_parent = subR; subR->_parent = _header; } else { Node* pparent = parent->_parent; subR->_parent = pparent; if (pparent->_left == parent) pparent->_left = subR; else pparent->_right = subR; } parent->_parent = subR; } 头节点指向的左右节点: 如图:header头节点指向最左最右的节点; 找红黑树的最左节点: //最左节点 Node* leftMost() { Node* cur = _header->_parent; while (cur && cur->_left) { cur = cur->_left; } return cur; } 找红黑树的最右节点: //最右节点 Node* rightMost() { Node* cur = _header->_parent; while (cur && cur->_right) { cur = cur->_right; } return cur; } 红黑树的插入情况分析: 情况1:插入节点的父节点颜色为黑色; 处理1:直接插入; 情况2:插入节点的父节点为红色,叔叔节点存在且为红色; 处理2:插入后叔叔节点和父节点颜色变黑,祖父节点颜色变红,向上遍历查看情况; 情况3:父节点为红色,叔叔节点存在为黑色或不存在,插入节点为父节点的左子树,父节点为祖父节点的左子树; 处理3:进行结构右旋操作(RotateR); 情况4:和情况3的结构调整相反父节点为红色,叔叔节点存在为黑色或不存在,插入节点为父节点的右子树,父节点为祖父节点的右子树; 处理4:进行结构左旋操作(RotateL); 情况5:和情况6结构调整相反父节点为红色,叔叔节点存在为黑色或不存在,插入节点为父节点的左子树,父节点为祖父节点的右子树; 处理5:先以父节点为根,先右旋操作(RotateR),再以祖父节点为根,左旋操作(RotateL); 情况6:父节点为红色,叔叔节点存在为黑色或不存在,插入节点为父节点的右子树,父节点为祖父节点的左子树; 处理6:先以父节点为根,左旋操作(RotateL),再以祖父节点为根,右旋操作(RotateR); 上图:(个情况) 情况1: 情况2: 情况3: 情况4:和情况3相反,自己画去; 情况5:情况6相反,自己画去; 情况6 上代码: //插入 bool insert(const pair<K,V>& kv) { //1.搜索树颜色 //空树:_header->parent:nullptr if (_header->_parent == nullptr) { //创建根节点 Node* root = new Node(kv); _header->_parent = root; root->_parent = _header; _header->_left = _header->_right = root; //根节点是黑色 root->_color = BLACK; return true; } //从根节点开始搜索 //正常插入 Node* cur = _header->_parent; Node* parent = nullptr; while (cur) { parent = cur; //和key值进行比较 //kv: pair<K,V>, key:pair.first if (cur->_kv.first == kv.first) { //key值存在,不允许插入 return false; } else if (cur->_kv.first > kv.first) { //向小的找,左子树去找 cur = cur->_left; } else { //向大的找,右子树去找 cur = cur->_right; } }//找到了 //创建待插入节点 cur = new Node(kv); if (parent->_kv.first > cur->_kv.first) parent->_left = cur;//比较大小,插在左右那个位置; else parent->_right = cur; cur->_parent = parent; //2.调整结构或者修改颜色 //判断是否至少有三层; while (cur != _header->_parent && cur->_parent->_color == RED) { parent = cur->_parent; Node* gfather = parent->_parent; //可能出现情况6,情况3 if (gfather->_left == parent) { Node* uncle = gfather->_right; //情况2:uncle存在,并且都是红色 if (uncle && uncle->_color == RED) { parent->_color = uncle->_color = BLACK; gfather->_color = RED; //继续向上更新 cur = gfather; } // else { cout << "Rotate" << endl; //双旋结构 //情况6: if (cur == parent->_right) { RotateL(parent); swap(cur, parent); } //经过第一次左旋后,情况6 与情况3 只有cur节点和父节点的指向不一样, //经过swap两者后,接下来操作相同都为右旋; RotateR(gfather); parent->_color = BLACK; gfather->_color = RED; break; } } else { //gfather->_right=parent的情况; //情况4,情况5,; //处理与情况3,情况6相反; Node* uncle = gfather->_right; //uncle存在,且都是红色,最简 if (uncle && uncle->_color == RED) { parent->_color = uncle->_color = BLACK; gfather->_color = RED; cur = gfather; } //情况3:uncle不存在,或者uncle存在为黑色 //双旋结构 else { // gfather // uncle parent // cur // if (cur == parent->_left) { RotateR(parent); swap(cur, parent); } RotateL(gfather); parent->_color = BLACK; gfather->_color = RED; break; } } } //根节点的颜色改成黑的; _header->_parent->_color = BLACK; //更新header左右指向 _header->_left = leftMost(); _header->_right = rightMost(); } 中序遍历进行打印: //对中序遍历进行封装: void inorder() { _inorder(_header->_parent); cout << endl; } //使用递归从叶子节点开始; //1.打印最左节点, //2.打印此节点 //3.打印右节点 void _inorder(Node* root) { if (root) { _inorder(root->_left); cout << root->_kv.first << "--->" << root->_kv.second << endl; _inorder(root->_right); } } 判断是否满足红黑树的性质: //红黑树: //1.根:黑色 //2.每条路径黑色个数相同 //3.红色不能连续 bool isBalance() { if (_header->_parent == nullptr) return true;//只有头节点也是满足红黑树 Node* root = _header->_parent; //1.判断根节点的颜色 if (root->_color == RED) return false; //统计一条路劲的黑色节点个数 int bCount = 0; Node* cur = root; while (cur) { if (cur->_color == BLACK) ++bCount; cur = cur->_left; } //遍历每一条路径 int curBCount = 0; return _isBalance(root, bCount, curBCount); } bool _isBalance(Node* root, int& bCount, int curBCount) { //当root为空,一条路径遍历结束 if(root == nullptr){ //判断黑色节点个数是否相同 if (bCount != curBCount) return false; else return true; } //判断是否为黑节点 if (root->_color == BLACK) ++curBCount; //判断红色节点是否有连续 if (root->_parent && root->_color == RED && root->_parent->_color == RED) { cout << "data: " << root->_kv.first << "--->" << root->_kv.second << endl; return false; } return _isBalance(root->_left, bCount, curBCount) && _isBalance(root->_right,bCount, curBCount); } 完整代码如下:可直接调试 #include<iostream> #include<iostream> #include<utility> using namespace std; //设置颜色属性 enum COLOR { BLACK, RED }; template<class K,class V> struct RBNode { //typedef bool color; //各个节点 RBNode<K, V>* _parent; RBNode<K, V>* _left; RBNode<K, V>* _right; //key-value pair<K, V> _kv; //颜色 COLOR _color; RBNode(const pair<K, V>& kv = pair<K, V>()) :_parent(nullptr) , _left(nullptr) , _right(nullptr) , _kv(kv) , _color(RED) { } }; template<class K,class V> class RBTree { public: //类型定义 typedef RBNode<K,V> Node; RBTree() :_header(new Node) { //创建空树 _header->_left = _header->_right = _header; } //插入 bool insert(const pair<K,V>& kv) { //1.搜索树颜色 //空树:_header->parent:nullptr if (_header->_parent == nullptr) { //创建根节点 Node* root = new Node(kv); _header->_parent = root; root->_parent = _header; _header->_left = _header->_right = root; //根节点是黑色 root->_color = BLACK; return true; } //从根节点开始搜索 //正常插入 Node* cur = _header->_parent; Node* parent = nullptr; while (cur) { parent = cur; //和key值进行比较 //kv: pair<K,V>, key:pair.first if (cur->_kv.first == kv.first) { //key值不允许重复 return false; } else if (cur->_kv.first > kv.first) { cur = cur->_left; } else { cur = cur->_right; } } //创建待插入节点 cur = new Node(kv); if (parent->_kv.first > cur->_kv.first) parent->_left = cur; else parent->_right = cur; cur->_parent = parent; //2.修改颜色或者调整结构 while (cur != _header->_parent && cur->_parent->_color == RED) { parent = cur->_parent; Node* gfather = parent->_parent; if (gfather->_left == parent) { Node* uncle = gfather->_right; //1.uncle存在,并且都是红色 if (uncle && uncle->_color == RED) { parent->_color = uncle->_color = BLACK; gfather->_color = RED; //继续更新 cur = gfather; } else { cout << "Rotate" << endl; //双旋结构 if (cur == parent->_right) { RotateL(parent); swap(cur, parent); } RotateR(gfather); parent->_color = BLACK; gfather->_color = RED; break; } } else { //gfather->_right=parent; Node* uncle = gfather->_right; //uncle存在,且都是红色,最简 if (uncle && uncle->_color == RED) { parent->_color = uncle->_color = BLACK; gfather->_color = RED; cur = gfather; } //uncle不存在,或者uncle存在为黑色 //双旋结构 else { // gfather // uncle parent // cur // if (cur == parent->_left) { RotateR(parent); swap(cur, parent); } RotateL(gfather); parent->_color = BLACK; gfather->_color = RED; break; } } } //根节点的颜色改成黑的; _header->_parent->_color = BLACK; //更新header左右指向 _header->_left = leftMost(); _header->_right = rightMost(); } // parent // subR // subRL void RotateL(Node* parent) { Node* subR = parent->_right; Node* subRL = subR->_left; subR->_left = parent; parent->_right = subRL; if (subRL) { subRL->_parent = parent; } if (_header->_parent == parent) { _header->_parent = subR; subR->_parent = _header; } else { Node* pparent = parent->_parent; subR->_parent = pparent; if (pparent->_left == parent) pparent->_left = subR; else pparent->_right = subR; } parent->_parent = subR; } // parent // subL // subLR void RotateR(Node* parent) { Node* subL = parent->_left; Node* subLR = subL->_right; subL->_right = parent; parent->_left = subLR; if (subLR) subLR->_parent = parent; //判断根 if (parent == _header->_parent) { _header->_parent = subL; subL->_parent = _header; } //不是根 else { Node* pparent = parent->_parent; subL->_parent = pparent; if (pparent->_left == parent) pparent->_left = subL; else pparent->_right = subL; } parent->_parent = subL; } //最左节点 Node* leftMost() { Node* cur = _header->_parent; while (cur && cur->_left) { cur = cur->_left; } return cur; } //最右节点 Node* rightMost() { Node* cur = _header->_parent; while (cur && cur->_right) { cur = cur->_right; } return cur; } void inorder() { _inorder(_header->_parent); cout << endl; } void _inorder(Node* root) { if (root) { _inorder(root->_left); cout << root->_kv.first << "--->" << root->_kv.second << endl; _inorder(root->_right); } } //红黑树: //1.根:黑色 //2.每条路径黑色个数相同 //3.红色不能连续 bool isBalance() { if (_header->_parent == nullptr) return true; Node* root = _header->_parent; if (root->_color == RED) return false; //统计一条路劲的黑色节点个数 int bCount = 0; Node* cur = root; while (cur) { if (cur->_color == BLACK) ++bCount; cur = cur->_left; } //遍历每一条路径 int curBCount = 0; return _isBalance(root, bCount, curBCount); } bool _isBalance(Node* root, int& bCount, int curBCount) { //当root为空,一条路径遍历结束 if(root == nullptr){ //判断黑色节点个数是否相同 if (bCount != curBCount) return false; else return true; } //判断是否为黑节点 if (root->_color == BLACK) ++curBCount; //判断红色节点是否有连续 if (root->_parent && root->_color == RED && root->_parent->_color == RED) { cout << "data: " << root->_kv.first << "--->" << root->_kv.second << endl; return false; } return _isBalance(root->_left, bCount, curBCount) && _isBalance(root->_right,bCount, curBCount); } private: Node* _header; }; void test() { RBTree<int, int> rbt; int n; cin >> n; for (int i = n; i > 0; --i) { rbt.insert(make_pair(i, i)); } rbt.inorder(); cout << rbt.isBalance(); } int main() { test(); return 0; }

优秀的个人博客,低调大师

面试官问redis分布式锁,如何设计才能让他满意?

前言 对于分布式锁的问题我也查过很多资料,感觉很多方式实现的并不完善,或者看着云里雾里的,不知所以然,于是就整理了这篇文章,希望对您有用,有写的不对的地方,欢迎留言指正。 首先咱们来聊聊什么是分布式锁,到底解决了什么问题?直接看代码 $stock=$this->getStockFromDb();//查询剩余库存 if($stock>0){ $this->ReduceStockInDb();//在数据库中进行减库存操作 echo"successful"; }else{ echo"库存不足"; } 很简单的一个场景,用户下单,咱们查询商品库存够不够,不够的话直接返回库存不足类似的错误信息,如果库存够的话直接在数据库中库存-1,然后返回成功,在业务逻辑上这段代码是没有什么问题的。 但是,这段代码是存在严重的问题的。 如果库存只剩 1,并且在并发比较高的情况下,比如两个请求同时执行了这段代码,同时查到库存为 1,然后顺利成章的都去数据库执行 stock-1 的操作,这样库存就会变成-1,然后就会引发超卖的现象,刚才说的是两个请求同时执行,如果同时几千个请求打过来,可见造成的损失是非常大的。于是呢有些聪明人就想了个办法,办法如下。 大家都知道 redis 有个 setnx 命令,不知道的话也没关系,我已经帮你查过了 我们把上面的代码优化一下 version-1 $lock_key="lock_key"; $res=$redis->setNx($lock_key,1); if(!$res){ return"error_code"; } $stock=$this->getStockFromDb();//查询剩余库存 if($stock>0){ $this->ReduceStockInDb();//在数据库中进行减库存操作 echo"successful"; }else{ echo"库存不足"; } $redis->delete($lock_key); 第一次请求进来会去 setNx,当然结果是返回 true,因为 lock_key 不存在,然后下面业务逻辑正常进行,任务执行完了之后把lock_key删除掉,这样下一次请求进来重复上述逻辑 第二次请求进来同样会去执行 setNx,结果返回 false,因为lock_key已经存在,然后直接返回错误信息(你双11抢购秒杀产品的时候给你返回的系统繁忙就是这么来的),不执行库存减 1 的操作 有的同学可能有疑惑,咱们不是说高并发的情况下么?要是两个请求同时 setNx 的话获取的结果不都是 true 了,同样会同时去执行业务逻辑,问题不是一样没解决么?但是大家要明白 redis 是单线程的,具备原子性,不同的请求执行 setnx 是顺序执行的,所以这个是不用担心的。 看似问题解决了,其实并不然。 我们这里伪代码写的简单,查询一下库存,然后减1操作而已,但是真实的生产环境中的情况是非常复杂的,在一些极端情况下,程序很可能会报错,崩溃,如果第一次执行加锁了之后程序报错了,那这个锁永远存在,接下来的请求永远也请求不进来了,所以咱们继续优化 version-2 try{//新加入trycatch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time=5;//新加入过期时间,这样锁不会一直占有 $res=$redis->setNx($lock_key,1,$expire_time); if(!$res){ return"error_code"; } $stock=$this->getStockFromDb();//查询剩余库存 if($stock>0){ $this->ReduceStockInDb();//在数据库中进行减库存操作 echo"successful"; }else{ echo"库存不足"; } }finally{ $redis->delete($lock_key); } 在setnx的时候给加上过期时间,这样至少不会让锁一直存在成为死锁 做try catch处理,万一程序抛出异常把锁删掉,也是为了解决死锁问题 这次是把死锁问题解决了,但是问题还是存在,大家可以先想一想还存在什么问题再接着往下看。 存在的问题如下 我们的过期时间是5秒钟,万一这个请求执行了6秒钟怎么办?超出的那一秒,跟没有加锁有什么区别?其实不仅仅如此,还有一个更严重的问题存在。比如第二个请求也是执行6秒,那么在第二个请求在超出的那1秒才进来的时候,第一个请求执行完了,当然会删除第二个请求加的锁,如果一直并发都很大的话,锁跟没有加没什么区别。 针对上述问题,最直接的办法是加长过期时间,但是这个不是解决问题的最终办法。把时间设置过长也会产生新的问题,比如各种原因机器崩溃了,需要重启,然后你把锁设置的时间是1年,同时也没有delete掉,难道机器重启了再等一年?另外这样设置固定值的解决方案在计算机当中是不允许的,曾经的“千年虫”问题就是类似的原因导致的 在加超时时间的时候一定要注意一定是一次性加上,保证其原子性,不要先setnx之后,再设置expire_time,这样的话万一在setnx之后那一个瞬间系统挂了,这个锁依然会成为一个永久的死锁 其实上述问题的主要原因在于,请求1会删掉请求2的锁,所以说锁需要保证唯一性。 咱们接着优化 version-3 try{//新加入trycatch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time=5;//新加入过期时间,这样锁不会一直占有 $client_id=session_create_id();//对每个请求生成唯一性的id $res=$redis->setNx($lock_key,$client_id,$expire_time); if(!$res){ return"error_code"; } $stock=$this->getStockFromDb();//查询剩余库存 if($stock>0){ $this->ReduceStockInDb();//在数据库中进行减库存操作 echo"successful"; }else{ echo"库存不足"; } }finally{ if($redis->get($lock_key)==$client_id){//在这里加一个判断,保证每次删除的锁是当次请求加的锁,这样避免误删了别的请求加的锁 $redis->delete($lock_key); } } 我们在每个请求生成了唯一client_id,并且把该值写入了lock_key中 在最后删除锁的时候会先判断这个lock_key是否是该请求生成的,如果不是的话则不会删除 但是上面方案还有问题,我们看最后 redis是先进行了get操作判断,然后再删除,是两步操作,并没有保证其原子性,redis的多步操作可以用lua脚本来保证原子性,其实看到lua也不需要感觉太陌生,他就是一种语言而已,在这里的作用是把多个redis操作打包成一个命令去执行,保证了原子性而已 version-4 try{//新加入trycatch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time=5;//新加入过期时间,这样锁不会一直占有 $client_id=session_create_id();//对每个请求生成唯一性的id $res=$redis->setNx($lock_key,$client_id,$expire_time); if(!$res){ return"error_code"; } $stock=$this->getStockFromDb();//查询剩余库存 if($stock>0){ $this->ReduceStockInDb();//在数据库中进行减库存操作 echo"successful"; }else{ echo"库存不足"; } }finally{ $script='//此处用lua脚本执行是为了get对比之后再delete的两步操作的原子性 ifredis.call("GET",KEYS[1])==ARGV[1]then returnredis.call("DEL",KEYS[1]) else return0 end '; return$instance->eval($script,[$lock_key,$client_id],1); } 这样封装之后,分布式锁应该就比较完善了。当然我们还可以进一步的优化一下用户体验 现在比如一个请求进来之后,如果请求被锁住,会立即返回给用户请求失败,请重新尝试,我们可以适当的延长一点这个时间,不要立即返回给用户请求失败,这样体验会更好 具体方式为用户请求进来如果遇到了锁,可以适当的等待一些时间之后重试,重试的时候如果锁释放了,则这次请求就可以成功 version-5 $retry_times=3;//重试次数 $usleep_times=5000;//重试间隔时间 try{//新加入trycatch处理,这样程序万一报错会把锁删除掉 $lock_key="lock_key"; $expire_time=5;//新加入过期时间,这样锁不会一直占有 while($retry_times>0){ $client_id=session_create_id();//对每个请求生成唯一性的id $res=$redis->setNx($lock_key,$client_id,$expire_time); if($res){ break; } echo"尝试重新获取锁"; $retry_times--; usleep($usleep_times); } if(!$res){//重试三次之后都没有获取到锁则给用户返回错误信息 return"error_code"; } $stock=$this->getStockFromDb();//查询剩余库存 if($stock>0){ $this->ReduceStockInDb();//在数据库中进行减库存操作 echo"successful"; }else{ echo"库存不足"; } }finally{ $script='//此处用lua脚本执行是为了get对比之后再delete的两步操作的原子性 ifredis.call("GET",KEYS[1])==ARGV[1]then returnredis.call("DEL",KEYS[1]) else return0 end '; return$instance->eval($script,[$lock_key,$client_id],1); } 当然上面的分布式锁还是不够完善的,比如redis主从同步延迟,就会产生问题,像java中redission实现的思想是非常好的,大家感兴趣可以看看源码,今天就聊到这里,感兴趣的朋友可以留言大家一起讨论

优秀的个人博客,低调大师

每日一博 | 我是这样给阿里面试官吹 ConcurrentHashMap 的

因为上篇文章HashMap已经讲解的很详细了,因此此篇文章会简单介绍思路,再学习并发HashMap就简单很多了,上一篇文章中我们最终知道HashMap是线程不安全的,因此在老版本JDK中提供了HashTable来实现多线程级别的,改变之处重要有以下几点。 ❝ HashTable的 put, get, remove等方法是通过 synchronized来修饰保证其线程安全性的。 HashTable是 不允许key跟value为null的。 问题是 synchronized是个关键字级别的重量锁,在get数据的时候任何写入操作都不允许。相对来说性能不好。因此目前主要用的 ConcurrentHashMap来保证线程安全性。 ❞ ConcurrentHashMap主要分为JDK<=7跟JDK>=8的两个版本,ConcurrentHashMap的空间利用率更低一般只有10%~20%,接下来分别介绍。 JDK7 先宏观说下JDK7中的大致组成,ConcurrentHashMap由Segment数组结构和HashEntry数组组成。Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构。正是通过Segment分段锁,ConcurrentHashMap实现了高效率的并发。缺点是并发程度是有segment数组来决定的,并发度一旦初始化无法扩容。先绘制个ConcurrentHashMap的形象直观图。要想理解currentHashMap,可以简单的理解为将数据「分表分库」。ConcurrentHashMap是由 Segment 数组 结构和HashEntry 数组 结构组成。 ❝ Segment 是一种可重入锁 ReentrantLock的子类 ,在 ConcurrentHashMap 里扮演锁的角色, HashEntry则用于存储键值对数据。 ConcurrentHashMap 里包含一个 Segment 数组来实现锁分离, Segment的结构和 HashMap 类似,一个 Segment里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment守护者一个 HashEntry 数组里的元素,当对 HashEntry数组的数据进行修改时,必须首先获得它对应的 Segment 锁。 ❞ 我们先看下segment类: staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{transientvolatileHashEntry<K,V>[]table;//包含一个HashMap可以理解为} 可以理解为我们的每个segment都是实现了Lock功能的HashMap。如果我们同时有多个segment形成了segment数组那我们就可以实现并发咯。 我们看下 currentHashMap的构造函数,先总结几点。 每一个segment里面包含的table(HashEntry数组)初始化大小也一定是2的次幂 这里设置了若干个用于位计算的参数。 initialCapacity:初始容量大小 ,默认16。 loadFactor: 扩容因子,默认0.75,当一个Segment存储的元素数量大于initialCapacity* loadFactor时,该Segment会进行一次扩容。 concurrencyLevel:并发度,默认16。并发度可以理解为程序运行时能够 「同时更新」ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的 分段锁个数,即Segment[]的数组长度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。 segment的数组大小最终一定是2的次幂 构造函数详解: //initialCapacity是我们保存所以KV数据的初始值//loadFactor这个就是HashMap的负载因子//我们segment数组的初始化大小@SuppressWarnings("unchecked")publicConcurrentHashMap(intinitialCapacity,floatloadFactor,intconcurrencyLevel){if(!(loadFactor>0)||initialCapacity<0||concurrencyLevel<=0)thrownewIllegalArgumentException();if(concurrencyLevel>MAX_SEGMENTS)//最大允许segment的个数,不能超过1<24concurrencyLevel=MAX_SEGMENTS;intsshift=0;//类似扰动函数intssize=1;while(ssize<concurrencyLevel){++sshift;ssize<<=1;//确保segment一定是2次幂}this.segmentShift=32-sshift;//有点类似与扰动函数,跟下面的参数配合使用实现当前元素落到那个segment上面。this.segmentMask=ssize-1;//为了取模专用if(initialCapacity>MAXIMUM_CAPACITY)//不能大于1<30initialCapacity=MAXIMUM_CAPACITY;intc=initialCapacity/ssize;//总的数组大小被segment分散后需要多少个tableif(c*ssize<initialCapacity)++c;//确保向上取值intcap=MIN_SEGMENT_TABLE_CAPACITY;//每个table初始化大小为2while(cap<c)//单独的一个segment[i]对应的table 容量大小。cap<<=1;//将table的容量初始化为2的次幂Segment<K,V>s0=newSegment<K,V>(loadFactor,(int)(cap*loadFactor),(HashEntry<K,V>[])newHashEntry[cap]);//负载因子,阈值,每个segment的初始化大小。跟hashmap 初始值类似。//并且segment的初始化是懒加载模式,刚开始只有一个s0,其余的在需要的时候才会增加。Segment<K,V>[]ss=(Segment<K,V>[])newSegment[ssize];UNSAFE.putOrderedObject(ss,SBASE,s0);//orderedwriteofsegments[0]this.segments=ss;} hash 不管是我们的get操作还是put操作要需要通过hash来对数据进行定位。 //整体思想就是通过多次不同方式的位运算来努力将数据均匀的分不到目标table中,都是些扰动函数privateinthash(Objectk){inth=hashSeed;if((0!=h)&&(kinstanceofString)){returnsun.misc.Hashing.stringHash32((String)k);}h^=k.hashCode();//single-wordWang/Jenkinshash.h+=(h<<15)^0xffffcd7d;h^=(h>>>10);h+=(h<<3);h^=(h>>>6);h+=(h<<2)+(h<<14);returnh^(h>>>16);} get 相对来说比较简单,无非就是通过 hash找到对应的 segment,继续通过 hash找到对应的 table,然后就是遍历这个链表看是否可以找到,并且要注意 get的时候是没有加锁的。 publicVget(Objectkey){Segment<K,V>s;HashEntry<K,V>[]tab;inth=hash(key);//JDK7中标准的hash值获取算法longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;//hash值如何映射到对应的segment上if((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab=s.table)!=null){//无非就是获得hash值对应的segment是否存在,for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);e!=null;e=e.next){//看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表Kk;if((k=e.key)key||(e.hashh&&key.equals(k)))returne.value;}}returnnull;} put 相同的思路,先找到 hash值对应的 segment位置,然后看该 segment位置是否初始化了(因为segment是懒加载模式)。选择性初始化,最终执行put操作。 @SuppressWarnings("unchecked")publicVput(Kkey,Vvalue){Segment<K,V>s;if(valuenull)thrownewNullPointerException();inthash=hash(key);//还是获得最终hash值intj=(hash>>>segmentShift)&segmentMask;//hash值位操作对应的segment数组位置if((s=(Segment<K,V>)UNSAFE.getObject(segments,(j<<SSHIFT)+SBASE))null)s=ensureSegment(j);//初始化时候因为只有第一个segment,如果落在了其余的segment中则需要现初始化。returns.put(key,hash,value,false);//直接在数据中执行put操作。} 其中put操作基本思路跟HashMap几乎一样,只是在开始跟结束进行了加锁的操作tryLock and unlock,然后JDK7中都是先扩容再添加数据的,并且获得不到锁也会进行自旋的tryLock或者lock阻塞排队进行等待(同时获得锁前提前new出新数据)。 finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){//在往该segment写入前,需要先获取该segment的独占锁,获取失败尝试获取自旋锁HashEntry<K,V>node=tryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{//segment内部的数组HashEntry<K,V>[]tab=table;//利用hash值,求应该放置的数组下标intindex=(tab.length-1)&hash;//first是数组该位置处的链表的表头HashEntry<K,V>first=entryAt(tab,index);for(HashEntry<K,V>e=first;;){if(e!=null){Kk;if((k=e.key)key||(e.hashhash&&key.equals(k))){oldValue=e.value;if(!onlyIfAbsent){//覆盖旧值e.value=value;++modCount;}break;}//继续顺着链表走e=e.next;}else{// node 是不是 null,这个要看获取锁的过程。没获得锁的线程帮我们创建好了节点,直接头插法//如果不为 null,那就直接将它设置为链表表头;如果是 null,初始化并设置为链表表头。if(node!=null)node.setNext(first);elsenode=newHashEntry<K,V>(hash,key,value,first);intc=count+1;//如果超过了该segment的阈值,这个segment需要扩容if(c>threshold&&tab.length<MAXIMUM_CAPACITY)rehash(node);//扩容else//没有达到阈值,将node放到数组tab的index位置,//将新的结点设置成原链表的表头setEntryAt(tab,index,node);++modCount;count=c;oldValue=null;break;}}}finally{//解锁unlock();}returnoldValue;} 如果加锁失败了调用scanAndLockForPut,完成查找或新建节点的工作。当获取到锁后直接将该节点加入链表即可,「提升」了put操作的性能,这里涉及到自旋。大致过程: ❝ 在我获取不到锁的时候我进行tryLock,准备好new的数据,同时还有一定的次数限制,还要考虑别的已经获得线程的节点修改该头节点。 ❞ privateHashEntry<K,V>scanAndLockForPut(Kkey,inthash,Vvalue){HashEntry<K,V>first=entryForHash(this,hash);HashEntry<K,V>e=first;HashEntry<K,V>node=null;intretries=-1;//negativewhilelocatingnode//循环获取锁while(!tryLock()){HashEntry<K,V>f;//torecheckfirstbelowif(retries<0){if(enull){if(nodenull)//speculativelycreatenode//进到这里说明数组该位置的链表是空的,没有任何元素//当然,进到这里的另一个原因是tryLock()失败,所以该槽存在并发,不一定是该位置node=newHashEntry<K,V>(hash,key,value,null);retries=0;}elseif(key.equals(e.key))retries=0;else//顺着链表往下走e=e.next;}//重试次数如果超过MAX_SCAN_RETRIES(单核1次多核64次),那么不抢了,进入到阻塞队列等待锁//lock()是阻塞方法,直到获取锁后返回elseif(++retries>MAX_SCAN_RETRIES){lock();break;}elseif((retries&1)0&&//进入这里,说明有新的元素进到了链表,并且成为了新的表头//这边的策略是,重新执行scanAndLockForPut方法(f=entryForHash(this,hash))!=first){e=first=f;//re-traverseifentrychangedretries=-1;}}returnnode;} Size 这个size方法比较有趣,他是先无锁的统计下所有的数据量看下前后两次是否数据一样,如果一样则返回数据,如果不一样则要把全部的segment进行加锁,统计,解锁。并且size方法只是返回一个统计性的数字,因此size谨慎使用哦。 publicintsize(){//Tryafewtimestogetaccuratecount.Onfailuredueto//continuousasyncchangesintable,resorttolocking.finalSegment<K,V>[]segments=this.segments;intsize;booleanoverflow;//trueifsizeoverflows32bitslongsum;//sumofmodCountslonglast=0L;//previoussumintretries=-1;//firstiterationisn'tretrytry{for(;;){if(retries++RETRIES_BEFORE_LOCK){//超过2次则全部加锁for(intj=0;j<segments.length;++j)ensureSegment(j).lock();//直接对全部segment加锁消耗性太大}sum=0L;size=0;overflow=false;for(intj=0;j<segments.length;++j){Segment<K,V>seg=segmentAt(segments,j);if(seg!=null){sum+=seg.modCount;//统计的是modCount,涉及到增删该都会加1intc=seg.count;if(c<0||(size+=c)<0)overflow=true;}}if(sumlast)//每一个前后的修改次数一样则认为一样,但凡有一个不一样则直接break。break;last=sum;}}finally{if(retries>RETRIES_BEFORE_LOCK){for(intj=0;j<segments.length;++j)segmentAt(segments,j).unlock();}}returnoverflow?Integer.MAX_VALUE:size;} rehash segment 数组初始化后就不可变了,也就是说 「并发性不可变」,不过 segment里的 table可以扩容为2倍,该方法没有考虑并发,因为执行该方法之前已经获取了锁。其中JDK7中的 rehash思路跟JDK8 中扩容后处理链表的思路一样,个人不过感觉没有8写的精髓好看。 //方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。privatevoidrehash(HashEntry<K,V>node){HashEntry<K,V>[]oldTable=table;intoldCapacity=oldTable.length;//2倍intnewCapacity=oldCapacity<<1;threshold=(int)(newCapacity*loadFactor);//创建新数组HashEntry<K,V>[]newTable=(HashEntry<K,V>[])newHashEntry[newCapacity];//新的掩码,如从16扩容到32,那么sizeMask为31,对应二进制‘000...00011111’intsizeMask=newCapacity-1;//遍历原数组,将原数组位置i处的链表拆分到新数组位置i和i+oldCap两个位置for(inti=0;i<oldCapacity;i++){//e是链表的第一个元素HashEntry<K,V>e=oldTable[i];if(e!=null){HashEntry<K,V>next=e.next;//计算应该放置在新数组中的位置,//假设原数组长度为16,e在oldTable[3]处,那么idx只可能是3或者是3+16=19intidx=e.hash&sizeMask;//新位置if(nextnull)//该位置处只有一个元素newTable[idx]=e;else{//Reuseconsecutivesequenceatsameslot//e是链表表头HashEntry<K,V>lastRun=e;//idx是当前链表的头结点e的新位置intlastIdx=idx;//for循环找到一个lastRun结点,这个结点之后的所有元素是将要放到一起的for(HashEntry<K,V>last=next;last!=null;last=last.next){intk=last.hash&sizeMask;if(k!=lastIdx){lastIdx=k;lastRun=last;}}//将lastRun及其之后的所有结点组成的这个链表放到lastIdx这个位置newTable[lastIdx]=lastRun;//下面的操作是处理lastRun之前的结点,//这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中for(HashEntry<K,V>p=e;p!=lastRun;p=p.next){Vv=p.value;inth=p.hash;intk=h&sizeMask;HashEntry<K,V>n=newTable[k];newTable[k]=newHashEntry<K,V>(h,p.key,v,n);}}}}//将新来的node放到新数组中刚刚的两个链表之一的头部intnodeIndex=node.hash&sizeMask;//addthenewnodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex]=node;table=newTable;} CAS操作 在JDK7里在 ConcurrentHashMap中通过原子操作 sun.misc.Unsafe查找元素、替换元素和设置元素。通过这样的硬件级别获得数据可以保证及时是多线程我也每次获得的数据是最新的。这些原子操作起着非常关键的作用,你可以在所有 ConcurrentHashMap的基本功能中看到,随机距离如下: finalvoidsetNext(HashEntry<K,V>n){UNSAFE.putOrderedObject(this,nextOffset,n);}staticfinal<K,V>HashEntry<K,V>entryAt(HashEntry<K,V>[]tab,inti){return(tabnull)?null:(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)i<<TSHIFT)+TBASE);}staticfinal<K,V>voidsetEntryAt(HashEntry<K,V>[]tab,inti,HashEntry<K,V>e){UNSAFE.putOrderedObject(tab,((long)i<<TSHIFT)+TBASE,e);} 常见问题 ConcurrentHashMap实现原理是怎么样的或者ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提升? ❝ ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,只要多个修改操作发生在不同的段上,它们就可以并发进行。 ❞ 在高并发下的情况下如何保证取得的元素是最新的? ❝ 用于存储键值对数据的HashEntry,在设计上它的成员变量value跟next都是volatile类型的,这样就保证别的线程对value值的修改,get方法可以马上看到。 ❞ ConcurrentHashMap的弱一致性体现在迭代器,clear和get方法,原因在于没有加锁。 比如迭代器在遍历数据的时候是一个Segment一个Segment去遍历的,如果在遍历完一个Segment时正好有一个线程在刚遍历完的Segment上插入数据,就会体现出不一致性。clear也是一样。 get方法和containsKey方法都是遍历对应索引位上所有节点,都是不加锁来判断的,如果是修改性质的因为可见性的存在可以直接获得最新值,不过如果是新添加值则无法保持一致性。 JDK8 JDK8相比与JDK7主要区别如下: ❝ 取消了segment数组,直接用table保存数据,锁的粒度更小,减少并发冲突的概率。采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率,并发控制使用Synchronized和CAS来操作。 存储数据时采用了数组+ 链表+红黑树的形式。 ❞ CurrentHashMap重要参数: ❝ private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组的最大值 private static final int DEFAULT_CAPACITY = 16; // 默认数组长度 static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的一个条件 static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的一个条件 static final int MIN_TREEIFY_CAPACITY = 64; // 链表转红黑树的另一个条件 static final int MOVED = -1; // 表示正在扩容转移 static final int TREEBIN = -2; // 表示已经转换成树 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数 transient volatile Node<K,V>[] table;// 默认没初始化的数组,用来保存元素 private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组 static final int NCPU = Runtime.getRuntime().availableProcessors();// 获取可用的CPU个数 private transient volatile Node<K,V>[] nextTable; // 连接表,用于哈希表扩容,扩容完成后会被重置为 null private transient volatile long baseCount;保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。private transient volatile int sizeCtl; 负数:表示进行初始化或者扩容,-1:表示正在初始化,-N:表示有 N-1 个线程正在进行扩容 正数:0 表示还没有被初始化,> 0的数:初始化或者是下一次进行扩容的阈值,有点类似HashMap中的threshold,不过功能「更强大」。 ❞ 若干重要类 构成每个元素的基本类 Node staticclassNode<K,V>implementsMap.Entry<K,V>{finalinthash;//key的hash值finalKkey;//keyvolatileVval;//valuevolatileNode<K,V>next;//表示链表中的下一个节点} TreeNode继承于Node,用来存储红黑树节点 staticfinalclassTreeNode<K,V>extendsNode<K,V>{TreeNode<K,V>parent;//红黑树的父亲节点TreeNode<K,V>left;//左节点TreeNode<K,V>right;//右节点TreeNode<K,V>prev;//前节点booleanred;//是否为红点} ForwardingNode 在 Node 的子类 ForwardingNode 的构造方法中,可以看到此变量的hash = 「-1」 ,类中还存储 nextTable的引用。该初始化方法只在 transfer方法被调用,如果一个类被设置成此种情况并且hash = -1 则说明该节点不需要resize了。 staticfinalclassForwardingNode<K,V>extendsNode<K,V>{finalNode<K,V>[]nextTable;ForwardingNode(Node<K,V>[]tab){//注意这里super(MOVED,null,null,null);this.nextTable=tab;}//.....} TreeBin TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制. staticfinalclassTreeBin<K,V>extendsNode<K,V>{TreeNode<K,V>root;volatileTreeNode<K,V>first;volatileThreadwaiter;volatileintlockState;//valuesforlockStatestaticfinalintWRITER=1;//setwhileholdingwritelockstaticfinalintWAITER=2;//setwhenwaitingforwritelockstaticfinalintREADER=4;//incrementvalueforsettingreadlock} 构造函数 整体的构造情况基本跟HashMap类似,并且为了跟原来的JDK7中的兼容性还可以传入并发度。不过JDK8中并发度已经有table的具体长度来控制了。 ❝ ConcurrentHashMap():创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(int):创建一个带有指定初始容量 tableSizeFor、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(Map<? extends K, ? extends V> m):构造一个与给定映射具有相同映射关系的新映射 ConcurrentHashMap(int initialCapacity, float loadFactor):创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射 ConcurrentHashMap(int, float, int):创建一个带有指定初始容量、加载因子和并发级别的新的空映射。 ❞ put 假设table已经初始化完成,put操作采用 CAS + synchronized 实现并发插入或更新操作,具体实现如下。 ❝ 做一些边界处理,然后获得hash值。 没初始化就初始化,初始化后看下对应的桶是否为空,为空就原子性的尝试插入。 如果当前节点正在扩容还要去帮忙扩容,骚操作。 用 syn来加锁当前节点,然后操作几乎跟就跟hashmap一样了。 ❞ // Node 节点的 hash值在HashMap中存储的就是hash值,在currenthashmap中可能有多种情况哦!finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){if(keynull||valuenull)thrownewNullPointerException();//边界处理inthash=spread(key.hashCode());//最终hash值计算intbinCount=0;for(Node<K,V>[]tab=table;;){//循环表Node<K,V>f;intn,i,fh;if(tabnull||(n=tab.length)0)tab=initTable();//初始化表如果为空,懒汉式elseif((f=tabAt(tab,i=(n-1)&hash))null){//如果对应桶位置为空if(casTabAt(tab,i,null,newNode<K,V>(hash,key,value,null)))//CAS原子性的尝试插入break;}elseif((fh=f.hash)MOVED)//如果当前节点正在扩容。还要帮着去扩容。tab=helpTransfer(tab,f);else{VoldVal=null;synchronized(f){//桶存在数据加锁操作进行处理if(tabAt(tab,i)f){if(fh>=0){//如果存储的是链表存储的是节点的hash值binCount=1;for(Node<K,V>e=f;;++binCount){Kek;//遍历链表去查找,如果找到key一样则选择性if(e.hashhash&&((ek=e.key)key||(ek!=null&&key.equals(ek)))){oldVal=e.val;if(!onlyIfAbsent)e.val=value;break;}Node<K,V>pred=e;if((e=e.next)null){//找到尾部插入pred.next=newNode<K,V>(hash,key,value,null);break;}}}elseif(finstanceofTreeBin){//如果桶节点类型为TreeBinNode<K,V>p;binCount=2;if((p=((TreeBin<K,V>)f).putTreeVal(hash,key,value))!=null){//尝试红黑树插入,同时也要防止节点本来就有,选择性覆盖oldVal=p.val;if(!onlyIfAbsent)p.val=value;}}}}if(binCount!=0){//如果链表数量if(binCount>=TREEIFY_THRESHOLD)treeifyBin(tab,i);//链表转红黑树哦!if(oldVal!=null)returnoldVal;break;}}}addCount(1L,binCount);//统计大小并且检查是否要扩容。returnnull;} 涉及到重要函数initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函数。 initTable 「只允许一个线程」对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度Thread.yield。这样,保证了表同时只会被一个线程初始化,对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。 //容器初始化操作privatefinalNode<K,V>[]initTable(){Node<K,V>[]tab;intsc;while((tab=table)null||tab.length0){if((sc=sizeCtl)<0)//如果正在初始化-1,-N 正在扩容。Thread.yield();//进行线程让步等待//让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。//它可能会获取到,也有可能被其他线程获取到。elseif(U.compareAndSwapInt(this,SIZECTL,sc,-1)){//比较sizeCtl的值与sc是否相等,相等则用-1 替换,这表明我这个线程在进行初始化了!try{if((tab=table)null||tab.length0){intn=(sc>0)?sc:DEFAULT_CAPACITY;//默认为16@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=tab=nt;sc=n-(n>>>2);//sc=0.75n}}finally{sizeCtl=sc;//设置sizeCtl类似threshold}break;}}returntab;} unsafe 在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。 //用来返回节点数组的指定位置的节点的原子操作@SuppressWarnings("unchecked")staticfinal<K,V>Node<K,V>tabAt(Node<K,V>[]tab,inti){return(Node<K,V>)U.getObjectVolatile(tab,((long)i<<ASHIFT)+ABASE);}//cas原子操作,在指定位置设定值staticfinal<K,V>booleancasTabAt(Node<K,V>[]tab,inti,Node<K,V>c,Node<K,V>v){returnU.compareAndSwapObject(tab,((long)i<<ASHIFT)+ABASE,c,v);}//原子操作,在指定位置设定值staticfinal<K,V>voidsetTabAt(Node<K,V>[]tab,inti,Node<K,V>v){U.putObjectVolatile(tab,((long)i<<ASHIFT)+ABASE,v);}//比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。staticfinal<K,V>booleancasTabAt(Node<K,V>[]tab,inti,Node<K,V>c,Node<K,V>v){returnU.compareAndSwapObject(tab,((long)i<<ASHIFT)+ABASE,c,v);} 可以看到获得table[i]数据是通过Unsafe对象通过反射获取的,取数据直接table[index]不可以么,为什么要这么复杂?在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的「副本」,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」。 helpTransfer //可能有多个线程在同时帮忙运行helpTransferfinalNode<K,V>[]helpTransfer(Node<K,V>[]tab,Node<K,V>f){Node<K,V>[]nextTab;intsc;if(tab!=null&&(finstanceofForwardingNode)&&(nextTab=((ForwardingNode<K,V>)f).nextTable)!=null){//table不是空且node节点是转移类型,并且转移类型的nextTable不是空说明还在扩容ingintrs=resizeStamp(tab.length);//根据 length 得到一个前16位的标识符,数组容量大小。//确定新table指向没有变,老table数据也没变,并且此时sizeCtl小于0还在扩容ingwhile(nextTabnextTable&&tabletab&&(sc=sizeCtl)<0){if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||transferIndex<=0)//1.sizeCtl无符号右移16位获得高16位如果不等rs标识符变了// 2. 如果扩容结束了这里可以看 trePresize 函数第一次扩容操作://默认第一个线程设置sc=rs左移16位+2,当第一个线程结束扩容了,//就会将 sc 减一。这个时候,sc 就等于 rs + 1。//3.如果达到了最大帮助线程个数65535个//4.如果转移下标调整ing扩容已经结束了break;if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1)){//如果以上都不是,将sizeCtl+1,增加一个线程来扩容transfer(tab,nextTab);//进行转移break;//结束循环}}returnnextTab;}returntable;} Integer.numberOfLeadingZeros(n) ❝ 该方法的作用是「返回无符号整型i的最高非零位前面的0的个数」,包括符号位在内;如果i为负数,这个方法将会返回0,符号位为1. 比如说,10的二进制表示为 0000 0000 0000 0000 0000 0000 0000 1010 java的整型长度为32位。那么这个方法返回的就是28 ❞ resizeStamp 主要用来获得标识符,可以简单理解是对当前系统容量大小的一种监控。 staticfinalintresizeStamp(intn){returnInteger.numberOfLeadingZeros(n)|(1<<(RESIZE_STAMP_BITS-1));//RESIZE_STAMP_BITS=16} addCount 主要就2件事:一是更新 baseCount,二是判断是否需要扩容。 privatefinalvoidaddCount(longx,intcheck){CounterCell[]as;longb,s;//首先如果没有并发此时countCells is null, 此时尝试CAS设置数据值。if((as=counterCells)!=null||!U.compareAndSwapLong(this,BASECOUNT,b=baseCount,s=b+x)){//如果counterCells不为空以为此时有并发的设置或者CAS设置baseCount失败了CounterCella;longv;intm;booleanuncontended=true;if(asnull||(m=as.length-1)<0||(a=as[ThreadLocalRandom.getProbe()&m])null||!(uncontended=U.compareAndSwapLong(a,CELLVALUE,v=a.value,v+x))){//1.如果没出现并发此时计数盒子为null//2.随机取出一个数组位置发现为空//3.出现并发后修改这个cellvalue失败了//执行funAddCountfullAddCount(x,uncontended);//死循环操作return;}if(check<=1)return;s=sumCount();//吧counterCells数组中的每一个数据进行累加给baseCount。}//如果需要扩容if(check>=0){Node<K,V>[]tab,nt;intn,sc;while(s>=(long)(sc=sizeCtl)&&(tab=table)!=null&&(n=tab.length)<MAXIMUM_CAPACITY){intrs=resizeStamp(n);//获得高位标识符if(sc<0){//是否需要帮忙去扩容if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||(nt=nextTable)null||transferIndex<=0)break;if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//第一次扩容elseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2))transfer(tab,null);s=sumCount();}}} baseCount添加 ConcurrentHashMap提供了 baseCount、 counterCells 两个辅助变量和一个 CounterCell辅助内部类。sumCount() 就是迭代 counterCells来统计 sum 的过程。put 操作时,肯定会影响 size(),在 put() 方法最后会调用 addCount()方法。整体的思维方法跟LongAdder类似,用的思维就是借鉴的 ConcurrentHashMap。每一个 Cell都用Contended修饰来避免伪共享。 ❝ JDK1.7 和 JDK1.8 对 size 的计算是不一样的。1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。 JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。 JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。 ❞ 关于扩容 在 addCount第一次扩容时候会有骚操作 sc=rs << RESIZE_STAMP_SHIFT) + 2)其中 rs = resizeStamp(n)。这里需要核心说一点, 如果不是第一次扩容则直接将低16位的数字 +1 即可。 putTreeVal 这个操作几乎跟HashMap的操作完全一样,核心思想就是一定要决定向左还是向右然后最终尝试放置新数据,然后balance。不同点就是有锁的考虑。 treeifyBin 这里的基本思路跟HashMap几乎一样,不同点就是先变成TreeNode,然后是「单向链表」串联。 privatefinalvoidtreeifyBin(Node<K,V>[]tab,intindex){Node<K,V>b;intn,sc;if(tab!=null){//如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了//因为这个阈值扩容可以减少hash冲突,不必要去转红黑树if((n=tab.length)<MIN_TREEIFY_CAPACITY)tryPresize(n<<1);elseif((b=tabAt(tab,index))!=null&&b.hash>=0){synchronized(b){//锁定当前桶if(tabAt(tab,index)b){TreeNode<K,V>hd=null,tl=null;for(Node<K,V>e=b;e!=null;e=e.next){//遍历这个链表然后将每个节点封装成TreeNode,最终单链表串联起来,//最终调用setTabAt放置红黑树TreeNode<K,V>p=newTreeNode<K,V>(e.hash,e.key,e.val,null,null);if((p.prev=tl)null)hd=p;elsetl.next=p;tl=p;}//通过TreeBin对象对TreeNode转换成红黑树setTabAt(tab,index,newTreeBin<K,V>(hd));}}}}} TreeBin 主要功能就是链表变化为红黑树,这个红黑树用TreeBin来包装。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下: TreeBin.first = 链表中第一个节点。 TreeBin.root = 红黑树中的root节点。 TreeBin(TreeNode<K,V>b){super(TREEBIN,null,null,null);//创建空节点hash=-2this.first=b;TreeNode<K,V>r=null;//root节点for(TreeNode<K,V>x=b,next;x!=null;x=next){next=(TreeNode<K,V>)x.next;x.left=x.right=null;if(rnull){x.parent=null;x.red=false;r=x;//root节点设置为x}else{Kk=x.key;inth=x.hash;Class<?>kc=null;for(TreeNode<K,V>p=r;;){//x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点intdir,ph;Kpk=p.key;if((ph=p.hash)>h)dir=-1;elseif(ph<h)dir=1;elseif((kcnull&&(kc=comparableClassFor(k))null)||(dir=compareComparables(kc,k,pk))0)dir=tieBreakOrder(k,pk);//当key不可以比较,或者相等的时候采取的一种排序措施TreeNode<K,V>xp=p;//放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。//找到了目前叶子节点才会进入再放置数据if((p=(dir<=0)?p.left:p.right)null){x.parent=xp;if(dir<=0)xp.left=x;elsexp.right=x;r=balanceInsertion(r,x);//每次插入一个元素的时候都调用balanceInsertion来保持红黑树的平衡break;}}}}this.root=r;assertcheckInvariants(root);} tryPresize 当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时候传入map会调用putAll方法直接put一个map的话,在「putAll」方法中没有调用initTable方法去初始化table,而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断。 PS:默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。 /***扩容表为指可以容纳指定个数的大小(总是2的N次方)*假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12*计算出来c的值为64,则要扩容到sizeCtl≥c*第一次扩容之后数组长:32 sizeCtl:24*第三次扩容之后数组长:128 sizeCtl:96 退出*/privatefinalvoidtryPresize(intsize){intc=(size>=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:tableSizeFor(size+(size>>>1)+1);//合理范围intsc;while((sc=sizeCtl)>=0){Node<K,V>[]tab=table;intn;if(tabnull||(n=tab.length)0){//初始化传入map,今天putAll会直接调用这个。n=(sc>c)?sc:c;if(U.compareAndSwapInt(this,SIZECTL,sc,-1)){//初始化tab的时候,把sizeCtl设为-1try{if(tabletab){@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=nt;sc=n-(n>>>2);//sc=sizeCtl=0.75n}}finally{sizeCtl=sc;}}}//初始化时候如果数组容量<=sizeCtl或容量已经最大化了则退出elseif(c<=sc||n>=MAXIMUM_CAPACITY){break;//退出扩张}elseif(tabtable){intrs=resizeStamp(n);if(sc<0){//sc=siztCtl如果正在扩容Table的话,则帮助扩容Node<K,V>[]nt;if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||(nt=nextTable)null||transferIndex<=0)break;//各种条件判断是否需要加入扩容工作。//帮助转移数据的线程数+1if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//没有在初始化或扩容,则开始扩容//此处切记第一次扩容直接+2elseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2)){transfer(tab,null);}}}} transfer 这里代码量比较大主要分文三部分,并且感觉思路很精髓,尤其「是其他线程帮着去扩容的骚操作」。 主要是 单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。 每个线程进来会先领取自己的任务区间 [bound,i],然后开始 --i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用 ForwardingNode标识旧table中该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可(跟HashMap第三部分一样思路),迁移结束后依然会将原表的该位置标识位已经处理。 该函数中的finish= true 则说明整张表的迁移操作已经「全部」完成了,我们只需要重置 table的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。 f((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT)return; 第一次扩容时在addCount中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,「相对应的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。 几乎跟 HashMap大致思路类似的遍历链表/红黑树然后扩容操作。 privatefinalvoidtransfer(Node<K,V>[]tab,Node<K,V>[]nextTab){intn=tab.length,stride;if((stride=(NCPU>1)?(n>>>3)/NCPU:n)<MIN_TRANSFER_STRIDE)//MIN_TRANSFER_STRIDE用来控制不要占用太多CPUstride=MIN_TRANSFER_STRIDE;//subdividerange//MIN_TRANSFER_STRIDE=16每个CPU处理最小长度个数if(nextTabnull){//新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件try{@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n<<1];nextTab=nt;}catch(Throwableex){//trytocopewithOOMEsizeCtl=Integer.MAX_VALUE;return;}nextTable=nextTab;transferIndex=n;//transferIndex指向最后一个桶,方便从后向前遍历}intnextn=nextTab.length;//新表长度ForwardingNode<K,V>fwd=newForwardingNode<K,V>(nextTab);//创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点booleanadvance=true;//是否继续向前查找的标志位booleanfinishing=false;//toensuresweep(清扫)beforecommittingnextTab,在完成之前重新在扫描一遍数组,看看有没完成的没//第一部分// i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】这样来跟线程划分任务。for(inti=0,bound=0;;){Node<K,V>f;intfh;//这个while循环的目的就是通过--i遍历当前线程所分配到的桶结点//一个桶一个桶的处理while(advance){//每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据intnextIndex,nextBound;if(--i>=bound||finishing){//通过此处进行任务区间的遍历advance=false;}elseif((nextIndex=transferIndex)<=0){i=-1;//任务分配完了advance=false;}//更新transferIndex//为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)elseif(U.compareAndSwapInt(this,TRANSFERINDEX,nextIndex,nextBound=(nextIndex>stride?nextIndex-stride:0))){//nextIndex本来等于末尾数字,bound=nextBound;i=nextIndex-1;advance=false;}}//当前线程所有任务完成if(i<0||i>=n||i+n>=nextn){intsc;if(finishing){//已经完成转移则直接赋值操作nextTable=null;table=nextTab;sizeCtl=(n<<1)-(n>>>1);//设置sizeCtl为扩容后的0.75return;}if(U.compareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){// sizeCtl-1 表示当前线程任务完成。if((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT){//判断当前线程完成的线程是不是最后一个在扩容的,思路精髓return;}finishing=advance=true;//如果是则相应的设置参数i=n;}}elseif((f=tabAt(tab,i))null)//数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])advance=casTabAt(tab,i,null,fwd);//如果老节点数据是空的则直接进行CAS设置为fwdelseif((fh=f.hash)MOVED)//已经是个fwd了,因为是多线程操作可能别人已经给你弄好了,advance=true;//alreadyprocessedelse{synchronized(f){//加锁操作if(tabAt(tab,i)f){Node<K,V>ln,hn;if(fh>=0){//该节点的hash值大于等于0,说明是一个Node节点//关于链表的操作整体跟HashMap类似不过感觉好像更扰一些。intrunBit=fh&n;//fh=f.hashfirsthash的意思,看第一个点放老位置还是新位置Node<K,V>lastRun=f;for(Node<K,V>p=f.next;p!=null;p=p.next){intb=p.hash&n;//n的值为扩张前的数组的长度if(b!=runBit){runBit=b;lastRun=p;//最后导致发生变化的节点}}if(runBit0){//看最后一个变化点是新还是旧旧ln=lastRun;hn=null;}else{hn=lastRun;//看最后一个变化点是新还是旧旧ln=null;}/**构造两个链表,顺序大部分和原来是反的,不过顺序也有差异*分别放到原来的位置和新增加的长度的相同位置(i/n+i)*/for(Node<K,V>p=f;p!=lastRun;p=p.next){intph=p.hash;Kpk=p.key;Vpv=p.val;if((ph&n)0)/**假设runBit的值为0,*则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点*并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/ln=newNode<K,V>(ph,pk,pv,ln);else/**假设runBit的值不为0,*则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点*并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/hn=newNode<K,V>(ph,pk,pv,hn);}setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);advance=true;}elseif(finstanceofTreeBin){//该节点hash值是个负数否则的话是一个树节点TreeBin<K,V>t=(TreeBin<K,V>)f;TreeNode<K,V>lo=null,loTail=null;//旧头尾TreeNode<K,V>hi=null,hiTail=null;//新头围intlc=0,hc=0;for(Node<K,V>e=t.first;e!=null;e=e.next){inth=e.hash;TreeNode<K,V>p=newTreeNode<K,V>(h,e.key,e.val,null,null);if((h&n)0){if((p.prev=loTail)null)lo=p;elseloTail.next=p;//旧头尾设置loTail=p;++lc;}else{//新头围设置if((p.prev=hiTail)null)hi=p;elsehiTail.next=p;hiTail=p;++hc;}}//ln如果老位置数字<=6则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树ln=(lc<=UNTREEIFY_THRESHOLD)?untreeify(lo):(hc!=0)?newTreeBin<K,V>(lo):t;hn=(hc<=UNTREEIFY_THRESHOLD)?untreeify(hi):(lc!=0)?newTreeBin<K,V>(hi):t;setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);//老表中i位置节点设置下advance=true;}}}}}} get 这个就很简单了,获得hash值,然后判断存在与否,遍历链表即可,注意get没有任何锁操作! publicVget(Objectkey){Node<K,V>[]tab;Node<K,V>e,p;intn,eh;Kek;//计算key的hash值inth=spread(key.hashCode());if((tab=table)!=null&&(n=tab.length)>0&&(e=tabAt(tab,(n-1)&h))!=null){//表不为空并且表的长度大于0并且key所在的桶不为空if((eh=e.hash)h){//表中的元素的hash值与key的hash值相等if((ek=e.key)key||(ek!=null&&key.equals(ek)))//键相等//返回值returne.val;}elseif(eh<0)//是个TreeBinhash=-2//在红黑树中查找,因为红黑树中也保存这一个链表顺序return(p=e.find(h,key))!=null?p.val:null;while((e=e.next)!=null){//对于结点hash值大于0的情况链表if(e.hashh&&((ek=e.key)key||(ek!=null&&key.equals(ek))))returne.val;}}returnnull;} clear 关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。 publicvoidclear(){longdelta=0L;inti=0;Node<K,V>[]tab=table;while(tab!=null&&i<tab.length){intfh;Node<K,V>f=tabAt(tab,i);if(fnull)++i;//这个桶是空的直接跳过elseif((fh=f.hash)MOVED){//这个桶的数据还在扩容中,要去扩容同时等待。tab=helpTransfer(tab,f);i=0;//restart}else{synchronized(f){//真正的删除if(tabAt(tab,i)f){Node<K,V>p=(fh>=0?f:(finstanceofTreeBin)?((TreeBin<K,V>)f).first:null);//循环到链表/者红黑树的尾部while(p!=null){--delta;//记录删除了多少个p=p.next;}//利用CAS无锁置nullsetTabAt(tab,i++,null);}}}}if(delta!=0L)addCount(delta,-1);//调整count} end ConcurrentHashMap是如果来做到「并发安全」,又是如何做到「高效」的并发的呢? 首先是读操作,读源码发现get方法中根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。 写操作 . 数据扩容函数是 transfer,该方法的只有 addCount, helpTransfer和 tryPresize这三个方法来调用。 addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。 helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的·hash·= MOVED 的时候,就会调用 helpTransfer方法,在 helpTransfer中再调用 transfer方法来帮助完成数组的扩容 ❝ tryPresize是在 treeIfybin和 putAll方法中调用, treeIfybin主要是在 put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。注意 putAll在初始化传入一个大map的时候会调用。· ❞ 总结扩容情况发生: ❝ 在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容 ❞ 3. 扩容时候是否可以进行读写。 ❝ 对于读操作,因为是没有加锁的所以可以的. 对于写操作,JDK8中已经将锁的范围细腻到table[i]l了,当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。 ❞ 多个线程又是如何同步处理的 在 ConcurrentHashMap中,同步处理主要是通过 Synchronized和 unsafe的硬件级别原子性 这两种方式来完成的。 ❝ 在取得sizeCtl跟某个位置的Node的时候,使用的都是 unsafe的方法,来达到并发安全的目的 当需要在某个位置设置节点的时候,则会通过 Synchronized的同步机制来锁定该位置的节点。 在数组扩容的时候,则通过处理的 步长和 fwd节点来达到并发安全的目的,通过设置hash值为MOVED=-1。 当把某个位置的节点复制到扩张后的table的时候,也通过 Synchronized的同步机制来保证线程安全 ❞ 套路 ❝ 谈谈你理解的 HashMap,讲讲其中的 get put 过程。 1.8 做了什么优化? 是线程安全的嘛? 不安全会导致哪些问题? 如何解决?有没有线程安全的并发容器? ConcurrentHashMap 是如何实现的?1.7、1.8 实现有何不同,为什么这么做。 1.8中ConcurrentHashMap的sizeCtl作用,大致说下协助扩容跟标志位。 HashMap 为什么不用跳表替换红黑树呢? ❞ 参考 HashMap讲解 本文分享自微信公众号 - sowhat1412(sowhat9094)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

面试官:请说说什么是BFC?大白话讲清楚

BFC到底是什么东西 BFC 全称:Block Formatting Context, 名为 "块级格式化上下文"。 W3C官方解释为:BFC它决定了元素如何对其内容进行定位,以及与其它元素的关系和相互作用,当涉及到可视化布局时,Block Formatting Context提供了一个环境,HTML在这个环境中按照一定的规则进行布局。 简单来说就是,BFC是一个完全独立的空间(布局环境),让空间里的子元素不会影响到外面的布局。那么怎么使用BFC呢,BFC可以看做是一个CSS元素属性 怎样触发BFC 这里简单列举几个触发BFC使用的CSS属性 overflow: hidden display: inline-block position: absolute position: fixed display: table-cell display: flex BFC的规则 BFC就是一个块级元素,块级元素会在垂直方向一个接一个的排列 BFC就是页面中的一个隔离的独立容器,容器里的标签不会影响到外部标签 垂直方向的距离由margin决定, 属于同一个BFC的两个相邻的标签外边距会发生重叠 计算BFC的高度时,浮动元素也参与计算 BFC解决了什么问题 1.使用Float脱离文档流,高度塌陷 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>高度塌陷</title> <style> .box { margin: 100px; width: 100px; height: 100px; background: red; float: left; } .container { background: #000; } </style> </head> <body> <div class="container"> <div class="box"></div> <div class="box"></div> </div> </body> </html> 效果: 可以看到上面效果给box设置完float结果脱离文档流,使container高度没有被撑开,从而背景颜色没有颜色出来,解决此问题可以给container触发BFC,上面我们所说到的触发BFC属性都可以设置。 修改代码 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>高度塌陷</title> <style> .box { margin: 100px; width: 100px; height: 100px; background: red; float: left; } .container { background: #000; display: inline-block; } </style> </head> <body> <div class="container"> <div class="box"></div> <div class="box"></div> </div> </body> </html> 效果: 2.Margin边距重叠 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Document</title> <style> .box { margin: 10px; width: 100px; height: 100px; background: #000; } </style> </head> <body> <div class="container"> <div class="box"></div> <div class="box"></div> </div> </body> </html> 效果: 可以看到上面我们为两个盒子的margin外边距设置的是10px,可结果显示两个盒子之间只有10px的距离,这就导致了margin塌陷问题,这时margin边距的结果为最大值,而不是合,为了解决此问题可以使用BFC规则(为元素包裹一个盒子形成一个完全独立的空间,做到里面元素不受外面布局影响),或者简单粗暴方法一个设置margin,一个设置padding。 修改代码 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Margin边距重叠</title> <style> .box { margin: 10px; width: 100px; height: 100px; background: #000; } </style> </head> <body> <div class="container"> <div class="box"></div> <p><div class="box"></div></p> </div> </body> </html> 效果: 3.两栏布局 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>两栏布局</title> <style> div { width: 200px; height: 100px; border: 1px solid red; } </style> </head> <body> <div style="float: left;"> 两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局 </div> <div style="width: 300px;"> 我是蛙人,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭 </div> </body> </html> 效果: 可以看到上面元素,第二个div元素为300px宽度,但是被第一个div元素设置Float脱离文档流给覆盖上去了,解决此方法我们可以把第二个div元素设置为一个BFC。 修改代码 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>两栏布局</title> <style> div { width: 200px; height: 100px; border: 1px solid red; } </style> </head> <body> <div style="float: left;"> 两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局两栏布局 </div> <div style="width: 300px;display:flex;"> 我是蛙人,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭,如有帮助请点个赞叭 </div> </body> </html> 效果: 结语 谢谢你读完本篇文章,希望对你能有所帮助,如有问题欢迎各位指正。 我是蛙人(✿◡‿◡),如果觉得写得可以的话,请点个赞吧❤。 感兴趣的小伙伴可以加入 [ 前端娱乐圈交流群 ] 欢迎大家一起来交流讨论 写作不易,「点赞」+「在看」+「转发」 谢谢支持❤ 往期好文 《分享15个Webpack实用的插件!!!》 《手把手教你写一个Vue组件发布到npm且可外链引入使用》 《分享12个Webpack中常用的Loader》 《聊聊什么是CommonJs和Es Module及它们的区别》 《带你轻松理解数据结构之Map》 《这些工作中用到的JavaScript小技巧你都知道吗?》 《【建议收藏】分享一些工作中常用的Git命令及特殊问题场景怎么解决》 参考 https://blog.csdn.net/weixin_43213962/article/details/105959869 https://blog.csdn.net/sinat_36422236/article/details/88763187

资源下载

更多资源
腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。