并发容器与框架——Fork/Join框架
1. Fork/Join框架概念
Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。
2.工作窃取算法
工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。
假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。
优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,还会因为创建多个线程和队列造成更多的资源消耗。
3. Fork/Join框架设计实现类
通过前面的介绍我们可以了解到Fork/Join框架主要实现两个步骤:分割任务以及执行任务并汇总结果。
- 分割任务:我们需要一个Fork类来分割任务,并且要将大任务分割的足够小。
- 执行任务并汇总结果:分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。
Fork/Join框架设计了两个类来完成以上两个步骤的功能。
- ForkJoinTask:这是一个抽象类,主要使用该类来创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。但Fork/Join框架提供了ForkJoinTask的两个抽象子类, RecursiveAction(用于没有返回结果的任务); RecursiveTask(用于有返回结果的任务)。通过继承这两个子类来创建一个ForkJoin任务。
- ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
4.简单使用Fork/Join框架
用Fork/Join框架来计算一个1+2+3+····+n的结果。注意,我们必须先想好如何分割任务,分割计算1+2+3+····+n时,必须至少分割为两个数字一组才能计算,但如果n过大两个数字一组会造成极大资源消耗,所以应考虑好任务分割的程度。此处仅以1+2+3+4为例。
public class TestFork_JoinFrame extends RecursiveTask<Integer>{ private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加 private int start; private int end; public TestFork_JoinFrame(int s,int e) { start=s; end=e; } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4); //执行任务 Future<Integer> f=pool.submit(task); try { System.out.println("最终结果:"+f.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected Integer compute() { int sum=0; boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割 if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ //分隔成两个子任务 int middle=(start+end)/2; TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle); TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务完成并获取结果汇总 sum=leftTask.join()+rightTask.join(); } return sum; } }
5.异常处理
ForkJoinTask在执行过程中可能会抛出异常,但是与普通线程任务一样,我们无法在主线程中对其进行捕获,所以以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null。
public class TestFork_JoinFrame extends RecursiveTask<Integer>{ private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加 private int start; private int end; public TestFork_JoinFrame(int s,int e) { start=s; end=e; } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4); //执行任务 Future<Integer> f=pool.submit(task); try { if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }else{ System.out.println("最终结果:"+f.get()); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected Integer compute() { int sum=0; boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割 if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ //分隔成两个子任务 int middle=(start+end)/2; TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle); TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务完成并获取结果汇总 sum=leftTask.join()+rightTask.join(); } return sum; } } }
6.框架实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
1.ForkJoinTask的fork()方法原理:调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。源码如下
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。源码码如下。
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
2.ForkJoinTask的join方法实现原理:Join方法的主要作用是阻塞当前线程并等待获取结果。源码如下
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务状态是被取消,则直接抛出CancellationException。如果任务状态是抛出异常,则直接抛出对应的异常。
doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
使用WebRTC和WebVR进行VR视频通话
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/vn9PLgZvnPs1522s82g/article/details/82392695 本文来自Google的开发专家Dan Jenkins,他喜欢将最新的Web API与RTC应用程序混合在一起。他还在Nimble Ape经营自己的咨询和开发公司。本文中,他给出了一个代码实现——通过使用WebVR将FreeSWITCH Verto WebRTC视频会议转换为虚拟现实会议的。LiveVideoStack对原文进行了摘译。 文 / Dan Jenkins 译 / 元宝 审校 / Ant 原文 : https://webrtchacks.com/webrtc-meets-webvr/ WebRTC不是Web平台上唯一流行的媒体API。几年前推出了Web虚拟现实(WebVR)规范,以便在Web浏览器中为虚拟现实设备提供支持。此后,它已移植到较新的WebXR设备API规范了。 今年早些时候在ClueCon,Dan Jenkins在演讲中表示,使用FreeSWITCH将WebRTC视频会议流添加到...
- 下一篇
深入详解Java线程池——ThreadPoolExecutor
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。 1.线程池实现原理 1.1线程池处理流程 首先,我们需要了解当提交一个任务给线程池之后,线程池的处理流程: 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。 线程池判断线程池的线程数量是否为最大线程数量且每个线程都处于工作状态。如果不是,则创建一个新的工作线程来执行任务。如果是,则交给饱和策略来处理这个任务。 以ThreadPo...
相关文章
文章评论
共有0条评论来说两句吧...