您现在的位置是:首页 > 文章详情

并发容器与框架——Fork/Join框架

日期:2018-09-03点击:501

1. Fork/Join框架概念

    Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。

2.工作窃取算法

    工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。

    假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。

    优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,还会因为创建多个线程和队列造成更多的资源消耗。

3. Fork/Join框架设计实现类

    通过前面的介绍我们可以了解到Fork/Join框架主要实现两个步骤:分割任务以及执行任务并汇总结果。

  1. 分割任务:我们需要一个Fork类来分割任务,并且要将大任务分割的足够小。
  2. 执行任务并汇总结果:分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。

    Fork/Join框架设计了两个类来完成以上两个步骤的功能。

  1. ForkJoinTask:这是一个抽象类,主要使用该类来创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。但Fork/Join框架提供了ForkJoinTask的两个抽象子类, RecursiveAction(用于没有返回结果的任务); RecursiveTask(用于有返回结果的任务)。通过继承这两个子类来创建一个ForkJoin任务。
  2. ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

4.简单使用Fork/Join框架

    用Fork/Join框架来计算一个1+2+3+····+n的结果。注意,我们必须先想好如何分割任务,分割计算1+2+3+····+n时,必须至少分割为两个数字一组才能计算,但如果n过大两个数字一组会造成极大资源消耗,所以应考虑好任务分割的程度。此处仅以1+2+3+4为例。

public class TestFork_JoinFrame extends RecursiveTask<Integer>{ private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加 private int start; private int end; public TestFork_JoinFrame(int s,int e) { start=s; end=e; } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4); //执行任务 Future<Integer> f=pool.submit(task); try { System.out.println("最终结果:"+f.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected Integer compute() { int sum=0; boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割 if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ //分隔成两个子任务 int middle=(start+end)/2; TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle); TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务完成并获取结果汇总 sum=leftTask.join()+rightTask.join(); } return sum; } } 

5.异常处理

    ForkJoinTask在执行过程中可能会抛出异常,但是与普通线程任务一样,我们无法在主线程中对其进行捕获,所以以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null。

public class TestFork_JoinFrame extends RecursiveTask<Integer>{ private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加 private int start; private int end; public TestFork_JoinFrame(int s,int e) { start=s; end=e; } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4); //执行任务 Future<Integer> f=pool.submit(task); try { if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }else{ System.out.println("最终结果:"+f.get()); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected Integer compute() { int sum=0; boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割 if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ //分隔成两个子任务 int middle=(start+end)/2; TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle); TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end); //执行子任务 leftTask.fork(); rightTask.fork(); //等待子任务完成并获取结果汇总 sum=leftTask.join()+rightTask.join(); } return sum; } } }

6.框架实现原理

    ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

   1.ForkJoinTask的fork()方法原理:调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。源码如下

public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }

      pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。源码码如下。

 final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }

    2.ForkJoinTask的join方法实现原理:Join方法的主要作用是阻塞当前线程并等待获取结果。源码如下

public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
 private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }

        首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务状态是被取消,则直接抛出CancellationException。如果任务状态是抛出异常,则直接抛出对应的异常。

        doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

原文链接:https://yq.aliyun.com/articles/633779
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章