首页 文章 精选 留言 我的

精选列表

搜索[高并发],共10000篇文章
优秀的个人博客,低调大师

交易背书 - fabric交易并发的基础

Hyperledger Fabric和其他许多区块链的关键区别之一,就在于Fabric区块链的交易执行过程:Fabric交易需要首先通过节点的背书,然后再进行交易排序,最后才利用有序交易进行账本的更新。本文将介绍Hyperledger Fabric所采用的执行-排序-验证这一三步交易模型的工作原理,以及引入背书环节对Hyperledger Fabric区块链性能的有益作用。 Hyperledger Fabric相关开发教程: Java链码与应用开发详解 Node.js链码与应用开发详解 1、交易的生命周期:Hyperledger Fabric vs. 其他区块链 在其他区块链平台中,交易生命周期基本由如下环节构成: 排序:交易有序加入账本,然后扩散到所有节点 执行:交易在所有节点上按顺序依次执行并更新账本 为了让所有节点保持一致的状态,交易必须确定性执行,也就是说,无论何时何地,同样的交易必须产生同样的结果。这一要求对智能合约形成了很强的约束,也是智能合约通常需要使用领域特定语言(DSL)来开发的原因之一,因此使用像Java或Go这样的通用开发语言基本上无法保证确定性。 在Hyperledger Fabric中,交易的声明周期则有所不同: 执行:交易(通过智能合约)以任意顺序执行,甚至可以并行执行 排序:当足够数量的节点对交易结果达成一致,该交易就会 被加入账本并扩散给所有节点。 验证:每个节点验证并按顺序执行交易,从而更新账本 首先需要注意的一点是,交易的执行和对账本的实际更新被拆分为两个环节,这一拆分带来一些有益的作用: 所有节点都需要更新账本,因此所有节点都需要执行验证步骤。 但并不是所有的节点都需要执行智能合约。Hyperledger Fabric 使用背书策略来定义哪些节点需要执行交易。这意味着指定的 链码(智能合约)不必开放给所有的节点 —— 那些不在背书策略中 的节点不需要由访问链码的权限。 交易可以在被排序之前先执行。这是的节点可以并行执行交易, 从而提高系统的吞吐量 在Fabric的三步交易模型中,在交易被添加到账本之前,其链码 执行结果是显式达成一致的,其他区块链的两步交易模型使用 确定性的链码,本身就隐含了节点之间就智能合约的执行结果 达成一致。显式达成一致可以让Fabric使用非确定性的链码, 这也是为什么我们可以使用Go、Java和Node.js编写Fabric链码 的原因。 2、Hyperledger Fabric的交易背书策略 Hyperledger Fabric允许用户自己定义链码执行的策略。这些背书策略定义了在交易被加入账本之前,哪些节点需要就交易结果达成一致。Fabric提供了一个小型的DSL来定义背书策略。下面展示了一些背书策略样例: 节点A、B、C和F都需要对类型为T的交易进行背书 通道中的大部分节点必须对类型为U的交易进行背书 A、B、C、D、E、F、G中的至少3个节点必须对类型为V的交易进行背书 背书策略并不保证正确的链码安装在正确的节点上。然而,背书和安装链码的确存在类似的机制,我们将在后续教程中介绍这一点。 3、Hypereledger Fabric交易背书的实现机制 到目前为止,我们都是松散地使用交易(transaction)这一术语。在排序-执行模型中,链码执行和账本更新被合二为一 —— 交易。在Fabric中,这两个概念是分开的,因此交易本身也被拆分。 Fabric从交易提议(Transaction Proposal)开始。这是一个用来触发链码执行的数据包。交易提议被发送给用于背书的节点。背书节点执行链码,如果成功的话就生成一个实际的账本交易。背书节点签名建议并返回交易提议的响应,这是执行-排序-验证模型中的执行步骤。 一旦交易提议的创建者收到足够的可以满足背书策略的签名,它就可以将交易(以及签名)提交以便添加到账本中,这就是排序步骤。 4、结论 Hyperledger Fabric在区块链交易方面采取了一个新颖的思路,将智能合约的执行与账本的更新分开使它可以提高交易吞吐量,支持更细粒度的隐私控制,实现更灵活强大的智能合约。而这些特性得以实现的一个关键因素就是在交易加入账本之前进行显式地交易背书。 原文链接:Hyperledger Fabric交易背书的深入理解 - 汇智网

优秀的个人博客,低调大师

并发编程工具之一:CountDownLatch 用法

CountDownLatch 用法CountDownLatch是java.util.concurrent包中一个类,CountDownLatch只要提供的机制是多个(具体数量等于初始化CountDownLatch时count的值)线程都达到了预期状态或者完成了预期工作时触发事件,其他线程可以等待这个事件来触发自己后续的工作。等待的线程可以是多个,即CountDownLatch可以唤醒多个等待的线程。到达自己预期状态的线程会调用CountDownLatch的countDown方法,而等待的线程会调用CountDownLatch的await方法。 每次调用countDown方法时,计数都会减1,直到0为止,此时因调用await方法的阻塞线程被唤醒 代码例子: public static void main(String[] args) throws InterruptedException { CountDownLatch countDown = new CountDownLatch(1); CountDownLatch await = new CountDownLatch(5); // 依次创建并启动处于等待状态的5个MyRunnable线程 for (int i = 0; i < 5; ++i) { new Thread(new MyRunnable(countDown, await)).start(); } System.out.println("用于触发处于等待状态的线程开始工作......"); System.out.println("用于触发处于等待状态的线程工作完成,等待状态线程开始工作......"); countDown.countDown(); await.await(); System.out.println("Bingo!"); } public class MyRunnable implements Runnable { private final CountDownLatch countDown; private final CountDownLatch await; public MyRunnable(CountDownLatch countDown, CountDownLatch await) { this.countDown = countDown; this.await = await; } public void run() { try { countDown.await();//等待主线程执行完毕,获得开始执行信号... System.out.println("处于等待的线程开始自己预期工作......"); await.countDown();//完成预期工作,发出完成信号... } catch (InterruptedException e) { e.printStackTrace(); } } } 运行结果: 用于触发处于等待状态的线程开始工作...... 用于触发处于等待状态的线程工作完成,等待状态线程开始工作...... 处于等待的线程开始自己预期工作...... 处于等待的线程开始自己预期工作...... 处于等待的线程开始自己预期工作...... 处于等待的线程开始自己预期工作...... 处于等待的线程开始自己预期工作...... Bingo!

优秀的个人博客,低调大师

浅谈Java多线程与并发原理

前序 线程安全问题的主要诱因 存在共享数据(也称临界资源) 存在多条线程共同操作这些共享数据 解决方法:同一时刻有且只有一个线程在操作共享数据,其他线程必须等到该线程处理完数据后再对共享数据进行操作 互斥锁的特征 互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程协调机制,这样在同一时间只有一个线程对需要同步的代码块(复合操作)进行访问。互斥性也称为操作的原子性。可见性:必须确保在锁被释放之前,对共享变量所做的修改,对于随后获得该锁的另一个线程是可见的(即在获得锁时应该获得最新共享变量的值),否则另一个线程可能是在本地缓存的某个副本上继续操作,从而引起不一致。注:synchronized 锁的不是代码,锁的是对象 获取锁的分类:获取对象锁、获取类锁 获取对象锁的两种用法: 同步代码块(synchronized(this),s

优秀的个人博客,低调大师

Java并发编程之ThreadLocal源码分析

ThreadLocal介绍 ThreadLocal是JDK1.2提供的,作用是给单个线程内的共享变量提供载具。每个线程之间的ThreadLocal里的数据是相互隔离的,并随着线程的消亡而消亡。 使用 ThreadLocal提供了get(),set(T value),remove()3个对外方法。 1.调用get()获取值 2.调用set(T value)设置值 3.调用remove()删除值 使用中的坑 ThreadLocal常被用来做登入状态信息的存储。但是如果当前线程操作完不对状态信息做remove()可能会出现坑。我们拿购买商品举个例子: A用户已经登入,请求购买 ThreadLocal存储A用户信息。 获取ThreadLocal里A用户信息调用去请求购买接口,并返回成功。 A用户线程未被系统回收,待重复利用。 B用户也发起请求购买请求,并重用了A用户使用过的线程,此时B用户并未登入,所以跳过ThreadLocal存储B用户信息的逻辑。 正常情况下B用户会返回需要登入的提示,但此时ThreadLocal存储A用户信息并未被清除,获取A用户信息并调用去请求购买接口,并返回成功。 可以看到B用户使用了A用户的信息去购买了商品,正确的做法应该是每个线程使用结束后去remove()。 ThreadLocal原理 ThreadLocal的UML图如下 调用set方法真正的数据是存在ThreadLocalMap里的,而ThreadLocalMap是线程Thread的成员变量,所以说线程Thread被jvm回收后ThreadLocalMap也会被回收。ThreadLocalMap的实现是采用顺序存储结构哈希表,它跟HashMap不同,每个hash地址只能存一个数据。它key存的是ThreadLocal本身而且它的Entry继承至WeakReference,所以它的key如果没被强引用会在GC的触发的时候回收掉。 set(T value)源码分析 public void set(T value) { //获取当前线程 Thread t = Thread.currentThread(); //根据当前线程获取ThreadLocalMap ThreadLocalMap map = getMap(t); //如果map不为空设置值 if (map != null) map.set(this, value); //如果map为空说明线程中成员变量ThreadLocalMap还没被创建,则创建map else createMap(t, value); } 这个方法主要根据当前线程获取ThreadLocalMap,如果还没初始化则调用createMap(t, value)初始化,反之调用map.set(this, value)设置值。 下面看下getMap(t)的实现 ThreadLocalMap getMap(Thread t) { return t.threadLocals; } 很简单就是获取Thread的成员变量threadLocals 先来看下map为空调用createMap(t, value)去创建ThreadLocalMap的情况: void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } createMap(t, value)直接是new的ThreadLocalMap,ThreadLocalMap构造方法如下: ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { //1.创建和初始化table容量 table = new Entry[INITIAL_CAPACITY]; //2.快速hash获取下标地址 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); //3.创建Entry,存放第一个数据 table[i] = new Entry(firstKey, firstValue); //4.设置存储个数 size = 1; //5.设置扩容阀值 setThreshold(INITIAL_CAPACITY); } 1.先创建和初始化table容量,table就是存放数据的容器,容器初始容量为16。 2.使用快速hash获取下标地址,这里看下获取firstKey.threadLocalHashCode的代码: private final int threadLocalHashCode = nextHashCode(); private static AtomicInteger nextHashCode = new AtomicInteger(); private static final int HASH_INCREMENT = 0x61c88647; private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } nextHashCode()的方法每次创建ThreadLocal都会加HASH_INCREMENT重新计算threadLocalHashCode的值,HASH_INCREMENT这个魔数的选取与斐波那契散列有关为了让哈希码能均匀的分布在2的N次方的数组里,这里指table数组。 3.创建Entry,存放第一个数据,这里以ThreadLocal自己本身key,Entry继承至WeakReference,代码如下: static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } 由于key是弱引用,所以它的key如果没被强引用会在GC的触发的时候回收掉。 4.设置长度为1。 5.设置扩容阀值,看下实现 private void setThreshold(int len) { threshold = len * 2 / 3; } 扩容阀值的计算是容量大小的2/3是,这里结果是10。 下面看下map.set(this, value)实现 private void set(ThreadLocal<?> key, Object value) { //调用set之前已经做过判断,所以table已经初始化了 Entry[] tab = table; //获取tab的长度 int len = tab.length; //1.快速hash获取下标地址 int i = key.threadLocalHashCode & (len-1); //2.用线性探测法解决冲突 for (Entry e = tab[i]; e != null; //取下个下标值 e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); //3.如果这个key已经存在,重新设置值 if (k == key) { e.value = value; return; } //4.如果key已经过期,则替换这个脏槽 if (k == null) { replaceStaleEntry(key, value, i); return; } } //5.创建Entry tab[i] = new Entry(key, value); //6.存储个数加1 int sz = ++size; //7.清理key已经过期清理的脏槽,如果没脏槽并且存储个数已经大于扩容阀值,则扩容 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } 1.快速hash获取下标地址。 2.用线性探测法解决冲突,调用nextIndex(i, len)遍历table。我们看下nextIndex(i, len)的源码 private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } 遍历的实现其实设计了一个环,从i开始遍历到达len长度后又开始从0开始。实际上这里用线性探测法解决冲突不会到达len长度,因为在到达之前已经进行了扩容。 3.如果找到的这个key已经存在,重新设置值。 4.如果找到的key已经过期,则替换这个脏槽。 5.创建Entry。 6.存储个数加1。 7.清理key已经过期清理的脏槽,如果未清理到脏槽并且存储个数已经大于扩容阀值,则调用rehash()重hash。 下面来看下replaceStaleEntry(key, value, i)的源码 private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; int slotToExpunge = staleSlot; //1.向前查找,找到第一个key过期的脏槽 for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; //2.从staleSlot位置开始向后查找,如果找到key,交换至staleSlot位置的脏槽 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); //找到key if (k == key) { //交换至staleSlot位置的脏槽 e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e; //如果slotToExpunge == staleSlot,说明前面没有脏槽,直接从i位置开始清理 if (slotToExpunge == staleSlot) slotToExpunge = i; //清理脏槽 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } //如果slotToExpunge == staleSlot,说明前面没有脏槽,直接从i位置开始清理 if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } //3.如果没有找到key,则创建一个新的Entry放至staleSlot位置的脏槽 tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); //4.如果运行过程中有找到脏槽,清理之 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); } 1.从staleSlot位置向前查找,找到第一个key过期的脏槽 2.从staleSlot位置开始向后查找,如果找到key,交换至staleSlot位置的脏槽 3.如果没有找到key,则创建一个新的Entry放至staleSlot位置的脏槽 4.如果运行过程中有找到脏槽,清理之,这里slotToExpunge != staleSlot 成立说明slotToExpunge已经改变说明找到了脏槽。 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len)的作用主要是清理脏槽expungeStaleEntry(slotToExpunge)方法作用的从slotToExpunge位置(包括slotToExpunge)开始清理临近的脏槽。 下面来看下expungeStaleEntry(slotToExpunge)的源码 private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // 1.清理槽 tab[staleSlot].value = null; tab[staleSlot] = null; //存储个数减一 size--; // 2.重hash或清理staleSlot之后的槽,直到空值 Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); //2.1如果轮询到的k为空,则清理之 if (k == null) { e.value = null; tab[i] = null; size--; } else { //2.2重hash,重新设置hash已经改变的Entry的位置 int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } //3.返回清理遍历的最后位置i return i; } 1.清理staleSlot位置的槽。这里清理的逻辑就是将value设置为null并将整个Entry设置为null,以便后续新Entry覆盖使用。 2.重hash或清理staleSlot之后的槽,直到空值。 2.1 如果轮询到的k为空,则清理之。 2.2 重hash,重新设置hash已经改变的Entry的位置。这里i地址有可能是经过线性探测解决的冲突的方式找到的地址,因为前面的槽已经被清理过所以线性探测解决的冲突方法找到的地址可能已经不是i,所以这边需要重新用线性探测解决的冲突方法查找新地址。 3 .返回清理遍历的最后位置i。 总结下expungeStaleEntry(slotToExpunge)逻辑其实不仅仅清理传的slotToExpunge地址的槽也会清理它临近的槽。 拿到清理遍历的最后位置i后会调用cleanSomeSlots(int i, int n)继续从i开始清理脏槽下面来看下的源码: private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { //环形遍历 i = nextIndex(i, len); Entry e = tab[i]; //如果是脏槽,则清理之 if (e != null && e.get() == null) { n = len; removed = true; //最终调用expungeStaleEntry(i)去清理 i = expungeStaleEntry(i); } //log2(n)清理次数 } while ( (n >>>= 1) != 0); return removed; } cleanSomeSlots(int i, int n)主要功能就是从i位置开始遍历log2(n)次去清理槽,为什么是log2(n)次官方给的原因是简单,快速。所以这个方法可能不是清理所有的脏槽,而是简单快速的清理几个脏槽。 下面来看下rehash()方法 private void rehash() { //1.清理所有的脏槽 expungeStaleEntries(); //2.如果清理过后存储个数还是大于扩容阀值的3/4,则扩容 if (size >= threshold - threshold / 4) resize(); } 1.调用expungeStaleEntries()清理所有的脏槽。 2.如果清理过后存储个数还是大于扩容阀值的3/4,则扩容。 下面看下expungeStaleEntries()方法源码 private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } } 代码很简单就是遍历所有的table并清理脏槽。 下面看下resize()方法源码 private void resize() { Entry[] oldTab = table; //获取老table容量 int oldLen = oldTab.length; //新table容量扩大2倍 int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; //遍历老的table,对所有Entry重hash定位 for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); //如果遇到脏槽,清理之帮助GC if (k == null) { e.value = null; // Help the GC } else { //重hash int h = k.threadLocalHashCode & (newLen - 1); //线性探测法解决冲突 while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } //重新计算扩容阀值 setThreshold(newLen); //重新设置存储个数 size = count; //重新设置table table = newTab; } resize()实现也比较简单,先创建比原来大2倍的Entry数组,并遍历老的table,对所有Entry重hash定位,如果冲突就是采用线性探测法解决冲突。 get()源码分析 看下get()的源码 public T get() { //获取当前线程 Thread t = Thread.currentThread(); //根据当前线程获取ThreadLocalMap ThreadLocalMap map = getMap(t); //如果不为空获取Entry if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } //为空获取初始化的值 return setInitialValue(); } 这个方法主要根据当前线程获取ThreadLocalMap,如果还没初始化则调用setInitialValue()初始化并返回值,反之调用map.getEntry(this)获取值。 先来看下map不为空调用map.getEntry(this)的源码: private Entry getEntry(ThreadLocal<?> key) { //1.快速hash获取hash地址 int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; //2.如果找到Entry,则返回 if (e != null && e.get() == key) return e; //3.如果未快速找到,则去遍历查找 else return getEntryAfterMiss(key, i, e); } 1.快速hash获取hash地址 2.如果找到Entry,则返回 3.如果未快速找到,则调用getEntryAfterMiss(key, i, e)去遍历查找,由于用线性探测法解决冲突, 来看下getEntryAfterMiss(key, i, e)的源码: private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; //从i位置开始遍历table while (e != null) { ThreadLocal<?> k = e.get(); //如果找到直接返回 if (k == key) return e; //如果是脏槽清理之 if (k == null) expungeStaleEntry(i); //获取下个地址 else i = nextIndex(i, len); e = tab[i]; } return null; } 实现很简单就是从i位置开始遍历table,找到就返回Entry,遍历过程中顺便清理脏槽。 再来看下setInitialValue()的源码: private T setInitialValue() { //1.获取默认初始化值 T value = initialValue(); Thread t = Thread.currentThread(); //2.根据当前线程获取ThreadLocalMap ThreadLocalMap map = getMap(t); //3.不为空,设置值 if (map != null) map.set(this, value); //4.反之初始化map else createMap(t, value); return value; } 1.获取默认初始化值,这里initialValue()是默认返回null的,源码如下: protected T initialValue() { return null; } 这个可以自己实现覆盖原来的方法。 2.根据当前线程获取ThreadLocalMap。 3.不为空,则调用map.set(this, value)设置值。 4.反之则调用createMap(t, value)初始化map。 remove()源码分析 直接看下remove()源码 public void remove() { //1.根据当前线程获取ThreadLocalMap ThreadLocalMap m = getMap(Thread.currentThread()); //2.如果map已经存在则调用m.remove(this)删除值 if (m != null) m.remove(this); } 1.根据当前线程获取ThreadLocalMap 2.如果map已经存在则调用m.remove(this)删除key为本身的Entry 下面来看下m.remove(this)的源码: private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; //快速hash到地址 int i = key.threadLocalHashCode & (len-1); //向后查找 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { //如果找到 if (e.get() == key) { //清理key e.clear(); //清理脏槽 expungeStaleEntry(i); return; } } } 实现很简单,先快速hash到地址i,然后从这个地址i往后查找key(包括地址i)直到槽为空,如果找到则清理之。

优秀的个人博客,低调大师

Java并发编程之Condition源码分析

Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充。ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活"。Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现。 常用方法 Condition中主要的方法有2个 (1)await()方法可以阻塞当前线程,并释放锁。 (2)在获取锁后可以调用signal()通知被await()阻塞的线程"激活"。 这里的await(),signal()必须在ReentrantLock#lock()和ReentrantLock#unlock()之间调用。 Condition实现分析 Condition的实现也是利用AbstractQueuedSynchronizer队列来实现,await()在被调用后先将当前线程加入到等待队列中,然后释放锁,最后阻塞当前线程。signal()在被调用后会先获取等待队列中第一个节点,并将这个节点转化成ReentrantLock中的节点并加入到同步阻塞队列的结尾,这样此节点的上个节点线程释放锁后会激活此节点线程取来获取锁。 await()方法源码分析 await()源码如下 public final void await() throws InterruptedException { //判断是否当前线程是否被中断中断则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //加入等待队列 Node node = addConditionWaiter(); //释放当前线程锁 int savedState = fullyRelease(node); int interruptMode = 0; //判断是否在同步阻塞队列,如果不在一直循环到被加入 while (!isOnSyncQueue(node)) { //阻塞当前线程 LockSupport.park(this); //判断是否被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //获取锁,如果获取中被中断则设置中断状态 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清除等待队列中被"激活"的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //如果当前线程被中断,处理中断逻辑 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 主要分以下几步 (1)先判断是否当前线程是否被中断中断则抛出中断异常如果未中断调用addConditionWaiter()加入等待队列 (2)调用fullyRelease(node)释放锁使同步阻塞队列的下个节点线程能获取锁。 (3)调用isOnSyncQueue(node)判断是否在同步阻塞队列,这里的加入同步阻塞队列操作是在另一个线程调用signal()后加入,如果不在同步阻塞队列会进行阻塞直到被激活。 (4)如果被激活然后调用checkInterruptWhileWaiting(node)判断是否被中断并获取中断模式。 (5)继续调用isOnSyncQueue(node)判断是否在同步阻塞队列。 (6)是则调用acquireQueued(node, savedState) 获取锁,这里如果获取不到也会被阻塞,获取不到原因是在第一次调用isOnSyncQueue(node)前,可能另一个线程已经调用signal()后加入到同步阻塞队列,然后调用acquireQueued(node, savedState) 获取不到锁并阻塞。acquireQueued(node, savedState)也会返回当前线程是否被中断,如果被中断设置中断模式。 (7)在激活后调用unlinkCancelledWaiters()清理等待队列的已经被激活的节点。 (8)最后判断当前线程是否被中断,如果被中断则对中断线程做处理。 下面来看下addConditionWaiter()实现 private Node addConditionWaiter() { //获取等待队列尾部节点 Node t = lastWaiter; //如果尾部状态不为CONDITION,如果已经被"激活",清理之,然后重新获取尾部节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建以当前线程为基础的节点,并将节点模式设置成CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果尾节点不存在,说明队列为空,将头节点设置成当前节点 if (t == null) firstWaiter = node; //如果尾节点存在,将此节点设置成尾节点的下个节点 else t.nextWaiter = node; //将尾节点设置成当前节点 lastWaiter = node; return node; } addConditionWaiter()的逻辑很简单,就是创建以当前线程为基础的节点并把节点加入等待队列的尾部待其他线程处理。 下面来看下fullyRelease(Node node)实现 final int fullyRelease(Node node) { boolean failed = true; try { //获取阻塞队列中当前线程节点的锁状态值 int savedState = getState(); //释放当前线程节点锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { //释放失败讲节点等待状态设置成关闭 if (failed) node.waitStatus = Node.CANCELLED; } } 调用getState()先获取阻塞队列中当前线程节点的锁状态值,这个值可能大于1表示多次重入,然后调用release(savedState)释放所有锁,如果释放成功返回锁状态值。 下面来看下isOnSyncQueue(Node node)实现 final boolean isOnSyncQueue(Node node) { //判断当前节点是否是CONDITION或者前置节点是否为空如果为空直接返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果下个节点存在,则在同步阻塞队列中返回true if (node.next != null) // If has successor, it must be on queue return true; //遍历查找当前节点是否在同步阻塞队列中 return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 此方法的功能是查找当前节点是否在同步阻塞队列中,方法先是快速判断,判断不了再进行遍历查找。 (1)第一步先判断次节点是否CONDITION状态或者前置节点是否存在,如果是表明不在队列中返回false,阻塞队列中的状态一般是0或者SIGNAL状态而且如果当前如果当前节点在队列阻塞中且未被激活前置节点一定不为空。 (2)第二步判断节点的下个节点是否存在,如果存在则表明当前当前节点已加入到阻塞队列中。 (3)如果以上2点都没法判断,也有可能刚刚加入到同步阻塞队列中,所以调用findNodeFromTail(Node node)做最后的遍历查找。查找从队列尾部开始查,从尾部开始查的原因是可能刚刚加入到同步阻塞队列中,从尾部能快速定位。 下面看下checkInterruptWhileWaiting(Node node)实现 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; } 此方法在线程被激活后被调用,主要功能就是判断被激活的线程是否被中断。此方法会返回2种中断状态THROW_IE和REINTERRUPT,THROW_IE是调用signal()前被中断返回,REINTERRUPT在调用signal()后被中断返回。 此方法先判断是否被标记中断,是的话再调用transferAfterCancelledWait(node)取判断是那种中断状态,transferAfterCancelledWait(node)方法分2步 (1)用CAS方式将节点状态改错等待状态改成CONDITION,并加入到同步阻塞队列中返回true (2)如果不能加入到同步阻塞队列就自旋一直等待加入 如果使用await()方法上面2步其实是没什么作用其最后一定会返回false,因为await()被激活只能调用 signal()方法,而signal()方法肯定已经将节点加入到同步阻塞队列中。所以以上逻辑是给await(long time, TimeUnit unit)等带超时激活方法用的。 acquireQueued(node, savedState)方法再上一章节已经讲过这边就不重复了,下面分析下unlinkCancelledWaiters()方法 private void unlinkCancelledWaiters() { //获取等待队列头节点 Node t = firstWaiter; Node trail = null; while (t != null) { //获取下个节点 Node next = t.nextWaiter; //如果状态不为CONDITION说明已经加入阻塞队列需要清理掉 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else //获取下个节点 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } 此方法就是从头开始查找状态不为CONDITION的节点并清理,状态不为CONDITION节点说明此节点已经加入到阻塞队列,已经不需要维护。 下面来看下reportInterruptAfterWait(interruptMode)方法 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //如果是THROW_IE模式直接抛出异常 if (interruptMode == THROW_IE) throw new InterruptedException(); //如果是REINTERRUPT模式标记线程中断由上层处理中断 else if (interruptMode == REINTERRUPT) selfInterrupt(); } 此方法处理中断逻辑。如果是THROW_IE模式直接抛出异常,如果是REINTERRUPT模式标记线程中断由上层处理中断。 signal()方法源码分析 signal()源码如下 public final void signal() { //是否当前线程持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //通知"激活"头节点线程 if (first != null) doSignal(first); } 先调用isHeldExclusively()判断锁是否被当前线程持有,然后检查等待队列是否为空,不为空就是可以取第一个节点调用doSignal(first)去"激活",这里激活不是真正的激活而只是将节点加入到同步阻塞队列尾部,所以上下文中带""的激活都是这种解释。 下面看下isHeldExclusively()实现 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } 实现就是比较下当前线程和持有锁的线程是否同一个 下面看下doSignal(first)的实现 private void doSignal(Node first) { do { //头指头后移一位,如果后面的节点为空,则将尾指头也指向空,说明队列为空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //清空头节点的下个节点 first.nextWaiter = null; //如果"激活"失败者取下个继续,直到成功或者遍历完 } while (!transferForSignal(first) && (first = firstWaiter) != null); } 此方法就是取当前头节点一直去尝试"激活",直到成功或者遍历完。 下面来看下transferForSignal(first)方法 final boolean transferForSignal(Node node) { //将CONDITION状态设置成0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //加入到同步阻塞队列 Node p = enq(node); int ws = p.waitStatus; //状态异常直接激活 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } (1)此方法先先将CONDITION状态设置成0,因为如果是CONDITION状态加入到同步阻塞队列,激活的时候是不识别的。 (2)加入到同步阻塞队列的尾部。所以同步阻塞队列中前面如果有多个在排队,调用unlock()不会马上激活此节点。 (3)状态异常直接调用unpark激活,这边按理说如果状态异常情况下激活,await()在调用unlock()被激活后会进行相应的异常处理,但看await()代码没有处理则是正常执行。 这个方法主要就是把节点加入到同步阻塞队列的,真正的激活则是调用unlock()去处理。

优秀的个人博客,低调大师

Java并发编程之ReentrantLock源码分析

ReentrantLock介绍 从JDK1.5之前,我们都是使用synchronized关键字来对代码块加锁,在JDK1.5引入了ReentrantLock锁。在JDK1.6之前synchronized关键字性能比ReentrantLock锁要差,JDK1.6之后性能基本是持平,但ReentrantLock锁功能要比synchronized关键字功能强大。 特点 synchronized关键字和ReentrantLock锁都是重入锁,可重入锁是指当一个线程获取到锁后,此线程还可继续获得这把锁,在此线程释放这把锁前其他线程则不可获得这边锁。相比synchronized关键字,ReentrantLock锁具有锁获取超时和获取锁响应中断的特点。ReentrantLock锁还分公平锁和非公平锁,公平锁模式是按线程调用加锁的先后排队顺序获取锁,非公平锁模式是已经在排队中的线程按顺序获取锁,但是新来的线程会和排队中的线程进行竞争,并不保证先排先获取锁。 ReentrantLock 源码分析 ReentrantLock实现了java.util.concurrent.locks.Lock接口和java.io.Serializable接口,前者是对实现Java锁的一种规范,后者说明ReentrantLock可以序列化。 ReentrantLock定义了一个成员变量 private final Sync sync; Sync类型是ReentrantLock的内部类,继承至AbstractQueuedSynchronizer ,AbstractQueuedSynchronizer是一个带空头的双向列表,为ReentrantLock的锁排队提供了基础支持。 ReentrantLock的UML关系图如下 下面我们解析下ReentrantLock中几个常用方法。 lock()方法源码分析 lock()是ReentrantLock中最常用的方法,用来对代码块加锁。lock()先是调用Sync的lock()的方法,Sync#lock()实现分为非公平模式和公平模式,我们对这2个模式分别讲解 非公平模式 Sync#lock()非公平模式代码如下: final void lock() { //用CAS方法设置枷锁状态 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //抢锁失败,进入后续逻辑。 acquire(1); } 新来线程先调用compareAndSetState(0, 1)方法用CAS方法设置加锁状态,这里是非公平模式实现要点,这样做主要是为了新来的线程和排队中的线程竞争,排队中的线程激活后也会用CAS方法设置加锁状态,就是看哪个线程线程抢的快,哪个能拿到锁。如果设置加锁状态成功,则设置AbstractQueuedSynchronizer中的全局变量线程为当前当前线程。如果设置加锁状态失败即抢锁失败,则调用acquire(1)进入排队逻辑。 AbstractQueuedSynchronizer#acquire(int arg)实现代码如下: public final void acquire(int arg) { //先调用tryAcquire(arg)再试下能不能获取到锁,无法获取则调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)进入排队 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 先调用tryAcquire(arg)再试下能不能获取到锁,获取成功则执行结束,无法获取则调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)进入排队,此方法返回参数为是否中断当前线程,排队过程中如果线程被中断则会返回ture,此时调用selfInterrupt()中断当前线程。 tryAcquire(arg)直接调用了非公平模式nonfairTryAcquire(acquires)方法我们看下实现: final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取锁状态 int c = getState(); //状态未加锁则尝试获取锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //判断是否是相同线程,如果是则表示当前线程的锁重入了 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 调用getState()方法获取加锁状态,如果为0表示当前未被加锁,尝试CAS设置加锁状态获取锁,如果成功同样设置AbstractQueuedSynchronizer中的全局变量线程为当前当前线程。如果已被加锁,这判断当前线程和加锁线程是否是同一线程,如果是同一线程则将获取锁的状态加1返回获取锁成功,这里就是可重入锁实现的核心,状态的值表示当前线程重入了多少次,之后的释放锁就要释放相同的次数。 接下来我们看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,acquireQueued主要功能是对当前线程阻塞,阻塞到能被上个获取到锁线程释放为止,addWaiter(Node.EXCLUSIVE)则是将当前线程加入到排队队列中。 我们先来看下addWaiter(Node.EXCLUSIVE)实现 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //CAS快速添加节点到尾部 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果尾节点不存在或者添加失败走最大努力添加节点逻辑 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; //如果头尾节点为空则创建空节点当头尾节点 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //CAS添加节点到尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 创建已当前线程为基础的节点,先走快速添加到尾部逻辑,获取尾节点如果尾节点存在,将当前节点和尾节点相连,并用CAS方式将当前节点设置为尾节点,这边使用CAS方式考虑了多个线程同时操作尾节点的情况,所以如果尾节点已经变更则快速添加节点操作失败,调用enq(node)方法走最大努力添加节点的逻辑。enq(node)最大努力添加逻辑就是一直添加节点直到添加节点到尾部成功。 下面看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的实现 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } acquireQueued里有个循环,这个循环的主要作用就是在线程激活后重试获取锁直到获取锁。node.predecessor()获取当前线程节点的前一个节点,如果是头节点,则当前线程尝试获取锁,获取锁成功设置当前节点为头节点。如果获取失败或者非头节点则调用shouldParkAfterFailedAcquire(p, node)判断是否需要阻塞等待,如果需要阻塞等待则调用parkAndCheckInterrupt()阻塞当前线程并让出cup资源直到被前一个节点激活,继续循环逻辑。 我们先来看下shouldParkAfterFailedAcquire(p, node)的实现 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 先获取前个节点的状态,状态分以下4类 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; 除了CANCELLED关闭状态是非正常,其他状态均正常状态。判断当前状态是否是SIGNAL正常状态,如果是就返回成功,这样当前线程就可以阻塞安心的等待上个节点的激活。如果状态为CANCELLED关闭状态则删除所有当前节点之前状态为CANCELLED的节点,返回失败让当前线程重试获取锁,如果是初始化0状态则CAS方式设置状态为SIGNAL。 接下来看下阻塞方法parkAndCheckInterrupt() private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 方法很简单调用LockSupport.park(this)阻塞当前线程,这里要讲下方法返回时调用Thread.interrupted()判断当前线程是否被中断,如果被中断的话,当前线程获取到锁后会调用Thread.currentThread().interrupt()中断线程。 公平模式 公平模式和非公平模式大部分代码相同,主要是获取锁的逻辑不同,我们就讲下代码不同的部分 lock()代码如下 final void lock() { acquire(1); } 非公平模式模式先尝试设置状态来获取锁,而公平模式则直接调用acquire(1)去走排队逻辑。 尝试获取锁的方法tryAcquire(int acquires)也不一样代码如下 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 该方法跟非公平锁基本都一样,只是在获取锁的时候加了hasQueuedPredecessors()判断,这个方法主要判断了当前线程是否在头节点的下个节点,这样保证了获取锁的顺序性。 unlock()方法源码分析 unlock()方法比较简单,直接调用sync.release(1)方法。 release(1)代码如下 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 先尝试释放锁,如果释放产品这判断当前节点是否为0不为0调用unparkSuccessor(h)方法激活下个节点的线程,否则直接返回。这里会有个疑问为什么h.waitStatus为0不去激活下个节点的线程,如果不激活下个节点的线程是否一直阻塞的,答案是否定的。这样做主要是为了释放锁的效率。waitStatus为0是初始化的值,这个值还没被下个节点线程调用shouldParkAfterFailedAcquire(p, node)方法设置成SIGNAL状态,也就说明下个节点线程还没被阻塞,此时如果下个节点线程调用此方法并设置成SIGNAL状态,势必它会重新获取锁,从而获取到锁避免了上述的问题。 下面来看下tryRelease(arg)方法 protected final boolean tryRelease(int releases) { //重入次数减1 int c = getState() - releases //非持有线程抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果释放了所有的重入次则清理持有线程为空 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //设置当前剩余的重入次数 setState(c); return free; } 因为锁可重入,因此调用getState()获取状态的值并减去一次重入次数,得到的c就是剩余重入的次数,然后判断当前释放的线程是否是当前占有锁的线程,如果不是抛出异常,否则先判断c是否为0表示当前线程持有的锁是否释放完全,如果是则设置持有锁的线程的变量为空,并设置锁状态为0,否则设置剩余的c到锁的状态。 接下来看下unparkSuccessor(h)的实现 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //查找下个正常状态的节点去激活 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //激活线程 LockSupport.unpark(s.thread); } 获取当前节点状态,设置如果当前节点正常情况则设置成0,然后取当前节点的下个节点,如果下个节点状态非正常即CANCELLED状态,则从队列的尾部开始查找查到最靠近当前的节点且状态正常的节点,然后调用LockSupport.unpark(s.thread)通知此节点停止阻塞。这边会有个疑问如果调用LockSupport.unpark(s.thread)方法后,此节点才调用LockSupport.park(this)去阻塞,这样会不会发生此节点永久阻塞的问题,答案是否定的,LockSupport.unpark(s.thread)方法的实现其实是为线程设置了一个信号量,LockSupport.park(this)就算后调,如果线程相同也会收到此信号从而激活线程,这里的实现原理就不展开讲。

优秀的个人博客,低调大师

并发容器与框架——Fork/Join框架

1. Fork/Join框架概念 Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。 2.工作窃取算法 工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。 假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。 优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,还会因为创建多个线程和队列造成更多的资源消耗。 3. Fork/Join框架设计实现类 通过前面的介绍我们可以了解到Fork/Join框架主要实现两个步骤:分割任务以及执行任务并汇总结果。 分割任务:我们需要一个Fork类来分割任务,并且要将大任务分割的足够小。 执行任务并汇总结果:分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。 Fork/Join框架设计了两个类来完成以上两个步骤的功能。 ForkJoinTask:这是一个抽象类,主要使用该类来创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。但Fork/Join框架提供了ForkJoinTask的两个抽象子类,RecursiveAction(用于没有返回结果的任务);RecursiveTask(用于有返回结果的任务)。通过继承这两个子类来创建一个ForkJoin任务。 ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。 4.简单使用Fork/Join框架 用Fork/Join框架来计算一个1+2+3+····+n的结果。注意,我们必须先想好如何分割任务,分割计算1+2+3+····+n时,必须至少分割为两个数字一组才能计算,但如果n过大两个数字一组会造成极大资源消耗,所以应考虑好任务分割的程度。此处仅以1+2+3+4为例。 public class TestFork_JoinFrame extends RecursiveTask<Integer>{ private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加 private int start; private int end; public TestFork_JoinFrame(int s,int e) { start=s; end=e; } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4); //执行任务 Future<Integer> f=pool.submit(task); try { System.out.println("最终结果:"+f.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected Integer compute() { int sum=0; boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割 if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ //分隔成两个子任务 int middle=(start+end)/2; TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle); TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务完成并获取结果汇总 sum=leftTask.join()+rightTask.join(); } return sum; } } 5.异常处理 ForkJoinTask在执行过程中可能会抛出异常,但是与普通线程任务一样,我们无法在主线程中对其进行捕获,所以以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null。 public class TestFork_JoinFrame extends RecursiveTask<Integer>{ private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加 private int start; private int end; public TestFork_JoinFrame(int s,int e) { start=s; end=e; } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4); //执行任务 Future<Integer> f=pool.submit(task); try { if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }else{ System.out.println("最终结果:"+f.get()); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected Integer compute() { int sum=0; boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割 if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ //分隔成两个子任务 int middle=(start+end)/2; TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle); TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务完成并获取结果汇总 sum=leftTask.join()+rightTask.join(); } return sum; } } } 6.框架实现原理 ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。 1.ForkJoinTask的fork()方法原理:调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。源码如下 public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。源码码如下。 final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } } 2.ForkJoinTask的join方法实现原理:Join方法的主要作用是阻塞当前线程并等待获取结果。源码如下 public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } 首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务状态是被取消,则直接抛出CancellationException。如果任务状态是抛出异常,则直接抛出对应的异常。 doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

用户登录
用户注册