java8学习:ForkJoin
内容来自《 java8实战 》,本篇文章内容均为非盈利,旨为方便自己查询、总结备份、开源分享。如有侵权请告知,马上删除。
书籍购买地址:java8实战
- 这篇是接上一篇并行数据处理与性能余下的问题:forkjoin进行讲解的
- forkjoin的目的就是以递归的方式来拆分更小的任务,然后将每个小任务处理后的结果在合并,fork就是拆分,join就是合并
-
RecursiveTask
- 要把任务提交到这个池,就必须创建 RecursiveTask的子类,R就是你需要返回的结果类型,如果不返回结果就使用RecursiveAction类型,继承这个抽象类只需要实现一个方法
protected abstract V compute();
- 此方法定以了将任务拆分成子任务的逻辑,以及无法再才分或不方便拆分时生成单个子任务结果的逻辑,(它定义了咋拆分又定义了怎么把子任务聚合)
-
先实现一下,再来说原理
- 还是实现1到一千万的累加和
public class ForkJoinImpl extends java.util.concurrent.RecursiveTask<Long> { //临界值,就是结束值减开始值的结果如果小于这个值那么就不拆分了,大于这个值才会拆分 private final int MEDIAN_NUM = 100000; //从多少计算 private int start_num = 0; //计算到多少 private int end_num = 0; //构造 public ForkJoinImpl(int start_num, int end_num) { this.start_num = start_num; this.end_num = end_num; } @Override protected Long compute() { //结束值减开始值的结果 int temp = end_num - start_num; //判断结束值减开始值的结果是否小于上面定义的临界值 if (temp <= MEDIAN_NUM){ //如果小的话,那么就不进行拆分了,就直接调用方法开始计算 return sequentiallySum(); } //到这就代表结束值减开始值的结果是大于临界值的 //继续进行拆分 //start_num到start_num + temp / 2是把数据的左半部分形成一个新的task //比如0到10,那么就是 10-0=10,temp=10,start_num=0,所以形成的新task就是(0,10/2=5),也就是左半部分 ForkJoinImpl leftTask = new ForkJoinImpl(start_num,start_num + temp / 2); //利用ForkJoinPool中的线程异步执行新创建的子任务 leftTask.fork(); //这创建的就是数据的后半段,start_num + temp / 2 = 0+10/2 = 6,所以形成的新task就是(0+10/2=6,10),也就是右半部分 ForkJoinImpl rightTask = new ForkJoinImpl(start_num + temp / 2,end_num); //同时执行第二个子任务,有可能允许进一步划分 Long rightResult = rightTask.compute(); //读取第一个子任务的结果,如果没有完成就等待 Long leftResult = leftTask.join(); //该任务的结果是两个子任务结果的组合 return rightResult + leftResult; } //计算方法:在不能进行拆分的时候进行计算 private Long sequentiallySum(){ long sum = 0; for (int i = start_num; i <= end_num; i++) { sum += i; } return sum; } } @Test public void test() throws Exception { ForkJoinImpl forkJoin = new ForkJoinImpl(0, 10000000); Long invoke = new ForkJoinPool().invoke(forkJoin); System.out.println("invoke = " + invoke); }
- 上面的流程的总结:当把ForkJoinImpl对象传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute方法,该方法会检查任务是够孝道足以顺序执行,也就是我们上面定义的临界值,如果不够小就会要求再次拆分数据,并分给一个新的ForkJoinImpl,新的ForkJoinImpl也是由pool安排执行。因此这个过程是递归重复的,把缘任务拆分为更小的任务。这时候达到临街值要求后,会顺序计算每个任务的结果,然后由分支过程创建的任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。如下面的图
-
使用此框架的最佳做法
- 对一个任务嗲用join方法会阻塞调用方,直到该任务做出结果,因此有必要在两个子任务的计算都开始之后在调用它,否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须去等待另一个任务完成才能启动
- 对子任务调用fork方法可以把他排进pool,同时对左边和右边的子任务调用它似乎很自然,但这样做的效率比直接对其中一个调用compute低,这样做可以以为其中一个子任务重用同一个线程,从而避免在线程池中多分配一个任务造成的开销。比如有些写
//执行子任务 left.fork(); right.fork(); //获取子任务结果 int lResult = left.join(); int rResult = right.join(); 而更高效的写法是 leftTask.fork(); ForkJoinImpl rightTask = new ForkJoinImpl(start_num + temp / 2,end_num); //自己理解的是它一直判断是否有大于临界值,如果有就左半部分进行加入pool,而右边一直去判断是否还有大于临界值的情况 //而不至于两方都在等待,并且上面的写法需要left返回后右边才会返回。如果理解的不对请评论指正 Long rightResult = rightTask.compute(); Long leftResult = leftTask.join();
-
工作窃取
- 对于上面会产生很多的小任务然后分配到内核上进行计算,理想情况下是所有的小任务机会全部同时执行完毕,但是由于线程是随意切换的,这种情况会发生改变:可能A已经空闲了,但是B还是忙的要死,为了解决这样的一个问题,此框架就有工作窃取的技术了
- 在应用中,这意味着这些任务拆不多被平均分配到pool中的所有的线程上。但每个线程都为分配给他的任务保存一个双向队列,每完成一个任务,就会从队列头上取出下一个任务开始执行,当A已经忙完,B还在忙的时候,A并不会闲下来,而是随机选一个别的线程,从队列的尾巴上偷走一个任务,这个过程一直继续下去,直到所有的任务执行完毕,所有队列都情况,这就是为什么不划分为几个大任务而是很多小任务,这样有助于更好的在工作线程之间平衡负载

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
震惊,20年开发经验的技术总监不会搭建Java开发环境
公司9月份空降一位技术总监,个子不高,头顶有点秃。说话老是中文中混夹一点English(这个只需要加一个parameter就好了;这个简单,你只需要把它hide住)。之前找我谈话也是气场十足:“我之前在很多家公司都是担任技术总监这个职位,之前很多年也是一直担任技术总监这个职位。”于是我深深的相信,他的确是一个很厉害的人物,相信我们技术部在他的带领下会越来越好,为公司作出更大的贡献! 然而,我错了。说一下他的牛逼之处吧,随便说两点 1、 他曾说:“我写过的代码比你吃的米饭还多”。可能是他笔记本太贵了吧,导致他一直没能成功搭建java开发环境。私底下把所有后台开发人员找了一遍求助,只有一位新来的同事抹不开面子,给他安装好了。后面还给别人穿小鞋。兄弟,这真是一个悲伤的故事 。 2、 有一天,一位后台兄弟在调试接口,参数没有拿到。这位总监一看,大喜。这不是指点他人的机会(装逼)到了么!你这个ajax写法有问题,你看这个参数,json字符串是没有大括号的。 3、 来公司第三天下午,大家正在调试开发。突然,测试环境崩了。过了一会,群里出现了一行字,不好意思,我把测试库删掉了,呵呵。 4、有一天,运...
- 下一篇
java8学习:并行数据处理与性能
内容来自《 java8实战 》,本篇文章内容均为非盈利,旨为方便自己查询、总结备份、开源分享。如有侵权请告知,马上删除。书籍购买地址:java8实战 在java7之前实现并行处理数据集合非常麻烦 得明确的把包含数据的数据结构分成若干子部分 要给每个子部分分配一个独立的线程 在恰当的时候对他们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把结果汇总在一起 在java7引入了fork/join框架来实现并行,在这篇文章中,将介绍利用Stream来实现并行和所需要注意的事项,并且介绍fork/join框架 之前我们提到过stream()是顺序执行,而parallelStream()是并行执行,并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流,这样可以把压力分担给不同的内核 下面是一个例子:就是求和操作,从1加到给出的n,我们用顺序流实现一下 @Test public void test() throws Exception { long l = System.currentTimeMillis(); System.out.println(parallelS...
相关文章
文章评论
共有0条评论来说两句吧...