海量数据处理:从并发编程到分布式系统
本系列文章主要围绕高并发这一话题展开,分享笔者在并发处理上的学习思路以及踩过的坑。具体思路大体分为三部分:
本文将重点讲解第一部分——Java多线程编程。
一、Java内存模型与线程
并发编程主要讨论以下几点:
通常我们可以将物理计算机中出现的并发问题类比到JVM中的并发。
物理计算机处理器、高速缓存、主内存间交互关系如图:
处理器为提高性能,会对输入代码乱序执行(Out-Of-Order Execution) 优化。
类比Java内存模型,线程、主内存、工作内存交互关系如图:
JMM定义了程序中各个变量访问规则,即在虚拟机中将内存取出和存储的底层细节。
线程A如果要跟线程B要通信的话,必须经历以下两个步骤:
线程的工作内存中保存了该线程使用到变量的主内存副本拷贝(也可理解为此线程的私有拷贝),线程对变量的操作(读取、赋值等)都在工作内存中进行,而不能直接读写主内存中变量。不同线程之间的通信也需要通过主内存来完成。主内存对应Java堆中对象实例数据部分,而工作内存则对应虚拟机栈中部分区域。
在此还有非常重要的点需要提及!
指令重排序
执行程序时,为提高性能,编译器和处理器常常会对指令做出重排序。分三种:
JMM的编译器会禁止特定类型的编译器重排序,对于处理器重排序(后两者),则要求Java编译器在生成指令序列时,插入特定类型的内存屏障指令,通过内存屏障指令来禁止特定类型的处理器重排序。
内存之间的交互操作
JMM中定义了8种操作来描述工作内存与主内存之间的实现细节:
JMM规定了执行上述八种操作时必须满足的规则(与happens-before原则是等效的,即先行发生原则):
相关JVM补充内容请查阅:JVM-攻城掠地
https://juejin.im/post/5b066aa9f265da0dbf004dde
测试工具
PostMan、Apache Bench、JMeter、LoadRunner。
二、线程安全性
原子性:提供了互斥访问,同一时刻只能由一个线程来对它进行操作。
可见性:一个线程对主内存的修改可以及时被其他线程观察到。
有序性:一个线程观察其它线程中指令执行顺序,由于指令重排序的存在,观察的结果一般为杂乱无章的。Java程序的天然有序性可以总结为——如果本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的。前者指的是线程内的串行语义,后者指的是指令重排序和工作内存和主内存同步延迟现象。
1、原子性-Atomic包
AtomicXXX:
CAS、Unsafe.compareAndSwapInt
通过CAS来保证原子性,即Compare And Swap比较交换:
CAS利用处理器提供的CMPXCHG指令实现,自旋CAS实现的基本思路就是循环进行CAS直到成功为止。比较内存的值与预期的值,若相同则修改预期的值。
CAS虽然可以进行高效的进行原子操作,但是CAS仍在存在三大问题:
-
ABA问题。在Java1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。大部分情况下ABA问题并不影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更加高效;
-
循环时间长开销大;
-
只能保证一个共享变量进行的原子操作。
测试:
AtomicInteger
源码实现
AtomicLong与LongAdder
LongAdder实现热点数据的分离、更快,如果有并发更新可能会出现误差。底层用数组实现,其结果为数组的求和累加。
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } /** * Equivalent to {@code add(1)}. */ public void increment() { add(1L); }
AtomicBoolean
希望某件事情只执行一次。
AtomicIntegerFieldUpdater
以原子性更新类中某一个属性,这属性需要用volatile进行修饰。
AtomicStampedReference
作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子的方式将该引用和该标志的值设置为给定的更新值。
AtomicLongArray
维护数组
2、原子性-锁及对比
Atomic:竞争激烈的时候能维持常态,比Lock性能更好,只能同步一个值。
3、线程安全-可见性
导致共享变量在线程间不可见的原因:
JMM关于synchronizd的两条规定:
volatile-可见性通过加入内存屏障和禁止重排序优化实现:
必须符合以下场景才可使用:
原因:volatile变量在各个线程工作内存中不存在一致性问题,但是Java里面的运算并非原子性操作,导致volatile变量运算在并发下一样是不安全的。(可以通过反编译来验证)
volatile通常用来作为状态标记量
三、安全发布对象
发布对象:使一个对象能够被当前范围之外代码所使用。
对象逸出:一种错误的发布。当一个对象还没有构造完成,就能被其它线程所见。
安全发布对象:
-
在静态初始化函数中初始化一个对象的引用;
-
将对象的引用保存到volatile类型域或者AtomicReference对象中;
-
对象引用保存到某个正确构造对象final类型域中;
-
将对象的引用保存到一个由锁保护的域中。
对此,单例模式是个很好的学习例子:
通过枚举实现单例模式
四、线程安全策略
满足条件:
Guava:ImmutableXXX:Collection、List、Set、Map……
2、线程封闭
先检查再执行:if(condition(a)){handle(a);} →非原子操作
4、同步容器
注意:同步容器在某些场合并不一定可以做到线程安全。
ArrayList → CopyOnWriteArrayList
读取未加锁,写加锁。
public void add(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (index > len || index < 0) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+len); Object[] newElements; int numMoved = len - index; if (numMoved == 0) newElements = Arrays.copyOf(elements, len + 1); else { newElements = new Object[len + 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index, newElements, index + 1, numMoved); } newElements[index] = element; setArray(newElements); } finally { lock.unlock(); } } public E get(int index) { return get(getArray(), index); }
↑ ConcurrentSkipListSet对批量操作不能保证原子性。
参考:JDK1.8源码分析之ConcurrentSkipListSet(八)
https://www.cnblogs.com/leesf456/p/5549820.html
↑ ConcurrentHashMap 效率相对而言要比 ConcurrentSkipListMap高,而同时 ConcurrentSkipListMap 则有些其不具有的特性:
ConcurrentHashMap
相比于HashTable采取更为高效的分段锁。ConcurrentHashMap里包含了一个Segment数组,一个Segment里包含了一个HashEntry数组。
Segment是一种可重入锁,扮演锁的角色;HashEntry则用于存储键值对数据。加锁/解锁是在Segment上。
ConcurrentLinkedQueu
非阻塞算法实现线程安全的队列。由head节点和tail节点组成,每个节点Node由节点元素item和指向下一个节点的next的引用组成,节点与节点之间同个这个next关联起来,组成链表结构的队列。
参考:
Java多线程(四)之ConcurrentSkipListMap深入分析
https://blog.csdn.net/guangcigeyun/article/details/8278349
探索 ConcurrentHashMap 高并发性的实现机制
https://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/index.html
五、J.U.C之AQS
AQS同步组件
1、等待多线程完成的CountDownLatch(JDK1.5)
允许一个或多个线程等待其他线程完成操作。
其构造函数接收一个int类型的参数作为计数器,调用CountDown方法的时候,计数器的值会减1,CountDownLatch的await方法会阻塞当前线程,直到N变为零。
应用:并行计算,解析Excel中多个Sheet的数据。
2、控制并发线程数的Semaphore
用来控制同时访问特定资源线程的数量。
应用:流量控制,特别是公共资源有限的场景,如数据库连接。
3、同步屏障CyclicBarrier
让一组线程达到一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,才会开门,所有被屏障拦截的线程才会继续执行。
应用:多线程计算数据,最后合并计算结果的场景。
CyclicBarrier和CountDownLatch的区别
CountDownLatch计数器只能使用一次,CyclicBarrier可以调用reset()方法重置。所以CyclicBarrier可以支持更加复杂的场景,如发生错误后重置计数器,并让线程重新执行。
//屏障拦截的线程数量 CyclicBarrier(int permits) //已经到达屏障 await() //CyclicBarrier阻塞线程的数量 getNumberWaiting()
4、重入锁ReentrantLock (排他锁:同时允许单个线程访问。)
支持重进入的锁,表示该锁能够支持一个线程对资源的重复加锁,即实现重进入:任意线程获取到锁之后能够再次获取该锁而不会被锁阻塞。
该锁支持获取锁时的公平和非公平性选择
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平锁就是等待时间最长的线程最优先获取锁,也就是说获取锁的是顺序的(FIFO),而非公平则允许插队。
非公平因为不保障顺序,则效率相对较高,而公平锁则可以减少饥饿发生的概率:
5、ReentrantReadWriteLock (读写锁,实现悲观读取,同时允许多个线程访问
在写线程访问时,所有读线程和其他写线程均被堵塞。其维护了一对锁,通过分离读锁、写锁,使得并发性比排他锁有很大提升。
适用于读多写少的环境,能够提供比排他锁更好的并发与吞吐量。
不足:ReentrantReadWriteLock是读写锁,在多线程环境下,大多数情况是读的情况远远大于写的操作,因此可能导致写的饥饿问题。
StampedLock
是ReentrantReadWriteLock 的增强版,是为了解决ReentrantReadWriteLock的一些不足。
StampedLock读锁并不会阻塞写锁,设计思路也比较简单,就是在读的时候发现有写操作,再去读多一次。StampedLock有两种锁,一种是悲观锁,另外一种是乐观锁,如果线程拿到乐观锁就读和写不互斥,如果拿到悲观锁就读和写互斥。
参考: Java8对读写锁的改进:StampedLock
https://blog.csdn.net/sunfeizhi/article/details/52135136
6、Condition
Condition提供了类似Object的监视器方法,依赖Lock实现等待/通知模式。
参考:Java线程(九):Condition-线程通信更高效的方式
https://blog.csdn.net/ghsau/article/details/7481142
7、FutureTask
用于异步获取执行结果或取消执行任务的场景。(实现基于AQS)
参考:
Java并发编程:Callable、Future和FutureTask
https://www.cnblogs.com/dolphin0520/p/3949310.html
FutureTask的用法及两种常用的使用场景
https://blog.csdn.net/linchunquan/article/details/22382487
8、Fork/Join
并行执行任务,即把大任务分割成若干小任务并行执行,最后汇总成大任务结果的框架。
参考:Fork/Join 模式高级特性
https://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/index.html
9、BlocklingQueu
阻塞队列是一个支持两个附加操作的队列:
通常用于生产者和消费者场景。生产者是向队列里添加元素的线程,消费者是从列里获取元素的线程。阻塞队列就是生产者放元素,消费者获取元素的容器。(FIFO)
参考:Java中的阻塞队列
http://www.infoq.com/cn/articles/java-blocking-queue
六、线程与线程池
最后我们聊一下线程创建的相关问题。
线程的创建:
1、继承Thread类
public class TestThread extends Thread{ @Override public void run(){ } }
局限性:Java单根继承,不易于扩展。
2、实现Runnabl接口
public class TestThread implements Runnable{ @Override public void run(){ } }
运行Callable任务可以拿到一个Future对象,进行异步计算。
状态:
为了保证共享数据的完整性,Java中引入了互斥锁的概念,即每个对象对应一个“互斥锁”的标记(monitor),用来保证任何时刻只能有一个线程访问该对象。利用Java中每个对象都拥有唯一的一个监视锁(monitor),当线程拥有这个标记时才会允许访问这个资源,而未拿到标记则进入阻塞,进入锁池。每个对象都有自己的一个锁池的空间,用于存放等待线程。由系统决定哪个线程拿到锁标记并运行。
方法:
-
currentThread():当前调用线程的相关信息;
-
isAlive():判断当前线程是否处于活动状态;
-
getId():线程的唯一标识;
-
interrupted():测试当前线程是否已经中断;
注:线程终端状态由该方法清除,意味着连续两次执行此方法,第二次将返回false。
-
isInterrupted():测试线程是否已经中断;
注:不清楚状态标志。
-
run(): 线程执行的具体方法,执行完成的会进入消亡状态;
-
start():使县城出局就绪状态,等待调用线程的对象执行run()方法;
-
sleep():让当前线程放弃CPU时间片直接返回就绪状态。
-
yield():让当前线程放弃CPU时间片直接返回就绪状态。但放弃的时间片不确定,可能刚刚放弃,便立即获取。
线程通信
-
join(): 让当前线程邀请调用方法的线程优先执行,在被邀请的线程执行结束之前,邀请别人的线程不再执行,处于阻塞状态,直到被邀请的线程执行结束之后,进入就绪状态;
-
interrupt(): 中断、打断线程的阻塞状态。直接让阻塞状态的线程返回就绪,由sleep()、join()导致的阻塞立刻解除;
-
wait():使当前执行代码的线程放弃monitor并进入等待状态,直到接收到通知或被中断为止(notify)。即此时线程将释放自己的所有锁标记和CPU占用,同时进入这个对象的等待池(阻塞状态)。只能在同步代码块中调用(synchronized);
-
notify():在等待池中随机唤醒一个线程,放入锁池,对象处于等待状态,直到获取对象的锁标记为止。 只能在同步代码块中调用(synchronized)。
-
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。潜在问题线程如果创建过多可能内存溢出。
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
创建一个定长线程池,支持定时及周期性任务执行。
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。
参考:Java 四种线程池的用法分析
https://blog.csdn.net/u011974987/article/details/51027795
暂且总结到这里。本文意在给大家提供学习的大体思路,其中有很多要点笔者并未深入剖析,比如ConcurrentHashMap,这块网络上有很多例子,可以参详。笔者也给出了很多不错的参考,大家可以根据个人需要点击阅读。如需详细了解,需具体阅读源码,多实践。
原文发布时间为:2018-06-13
本文作者: Mark

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java并发编程 -- AQS
AbstractQueuedSynchronizer是为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供的一个框架。 1. 构造方法 /** * Creates a new {@code AbstractQueuedSynchronizer} instance * with initial synchronization state of zero. */ protected AbstractQueuedSynchronizer() { } 创建具有初始同步状态 0 的新 AbstractQueuedSynchronizer 实例。 2. 方法详细信息 方法有很多,因为aqs怎么是个框架,需要大概了解每个方法,不然谈不到使用。 主要有三大类:状态(state)、获得锁(acquire)、释放锁(release)、附加一个独占线程(ExclusiveOwnerThread) 这四个概念一定要非常清楚。不然很难学会。当然,队列和链表基础也得扎实。 好了,我基于官方文档和源码整理了以下方法(过目理解): getState protected final...
- 下一篇
C#对接----韵达开发平台--取电子面单
引子 最近根据业务的一些需求,所以放弃从快递鸟对接去电子面单,转而直接对接韵达开发平台:http://open.yundasys.com/ ,中间踩了一些坑,借此做了一个小案例给大伙,瞅瞅,若有需改进之处,还请指出!!! 废话不多数:首先咱先对韵达的一些接口参数了解清楚: 当然附上地址:http://open.yundasys.com/index.php?g=&m=ApiTools&a=exm 还有接口的一些SDK文件地址,这个就各位观众大老爷们自己去看了:http://open.yundasys.com/index.php?g=&m=ApiTools&a=apps&id=14 解决方案 上代码走起:基础参数的模型 using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace ConsoleApplication1 {//请求参数 class RequestVO { /// <summary> /// X...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS关闭SELinux安全模块
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7设置SWAP分区,小内存服务器的救世主