您现在的位置是:首页 > 文章详情

浅谈JDK 1.8中的PrarllelStream

日期:2019-10-16点击:484

Stream是JDK 1.8中新增加的一个特性,就如同一个高级版迭代器(Iterator),可无限数据源,单向,不可往复,遍历过一次后即用尽了,正如流水一去不复返。而和迭代器只能命令式地、串行化操作不同,Stream可以并行化操作;
而ParallelStream正是一个并行执行的流,它是通过默认的ForkJoinPool提高多线程任务处理速度;

1. 一个栗子

static List<String> construct() { List<String> Strings = new ArrayList<String>(); for (int i = 0; i < 50; i++) { String p = "name" + i; Strings.add(p); } return Strings; } static void doFor(List<String> Strings) { long start = System.currentTimeMillis(); for (String p : Strings) { try { Thread.sleep(100); } catch (InterruptedException e) { } // System.out.println(p); } long end = System.currentTimeMillis(); System.out.println("doFor cost:" + (end - start)); } static void doStream(List<String> Strings) { long start = System.currentTimeMillis(); Strings.stream().forEach(x -> { try { Thread.sleep(100); } catch (InterruptedException e) { } // System.out.println(x); }); long end = System.currentTimeMillis(); System.out.println("doStream cost:" + (end - start)); } static void doParallelStream(List<String> Strings) { long start = System.currentTimeMillis(); Strings.parallelStream().forEach(x -> { try { Thread.sleep(100); } catch (InterruptedException e) { } // System.out.println(x); }); long end = System.currentTimeMillis(); System.out.println("doParallelStream cost:" + (end - start)); }

运行结果:

doFor cost:5119 doStream cost:5221 doParallelStream cost:724

从执行结果来看,stream顺序输出,而parallelStream无序输出;parallelStream执行效率最快;
下面来刨析下背后的运行机制;

2. 认识 ForkJoin

ForkJoin是JDK 1.7中推出的一个新特性,它同ThreadPoolExecutor一样实现了Executor和ExecutorService接口。核心线程的数量默认值采用当前可用的CPU数量,并使用了一个无限队列来保存需要执行的任务;

ForkJoinPool的核心算法主要是分治法(Divide-and-Conquer Algorithm),可以将一个任务分拆为多个子任务(所有子任务都完成之后才执行主任务),子任务执行完毕后,再把结果合并起来;能够使用相对少的线程来处理大量的任务,并且这些任务之间是有父子依赖的,必须是子任务执行完成后,父任务才能执行;也可以让其中的线程创建新的任务,并挂起当前的任务,任务以及子任务会保留在一个内部队列中,此时线程就能够从队列中选择任务顺序执行。
_

如在典型的快速排序算法应用中:需要对1000万、个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于2时,会停止分割,转而使用插入排序对它们进行排序。所有的任务加起来会有大概2000000+个;

而同样的任务交由ThreadPoolExecutor则几乎是不可能的任务,因为ThreadPoolExecutor中的线程无法向任务队列中再添加一个任务并且在等待该任务完成之后再继续执行,同时也无法选择优先执行子任务,当需要完成200万个具有父子关系的任务时,需要200万个并行线程,显然这是不可行的。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行;

3. 了解 Work Stealing / 工作窃取

工作窃取(work-stealing)算法是整个forkjion框架的核心理念,是指某个线程从其他队列里窃取任务来执行;充分的利用了现代CPU多核,提高新能;

那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行;

4. ParallelStream 运行机制

JDK 1.8中ForkJoinPool添加了一个默认线程数量为CPU核心数的通用线程池静态类型,用来处理那些没有被显式提交到任何线程池的任务。如在上例子中,对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被通用线程池处理;代码的可读性和代码量较ThreadPoolExecutor明显更胜一筹;

ForkJoinPool的线程数量可以通过设置系统属性:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)

另外需要注意是调用forEach方法时它会将执行forEach本身的线程也作为线程池中的一个工作线程。因此,即使将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工作线程。因此在使用forEach的时候,线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的;所以当ForkJoinPool通用线程池实际需要4个工作线程时,可以将它设置成3,那么在运行时可用的工作线程就是4了;

5. 线程安全考虑

再看一个例子:

static void doThreadUnSafe() { List<Integer> listFor = new ArrayList<>(1000); List<Integer> listParallel = new ArrayList<>(1000); IntStream.range(0, 1000).forEach(listFor::add); IntStream.range(0, 1000).parallel().forEach(listParallel::add); System.out.println("listFor size :" + listFor.size()); System.out.println("listParallel size :" + listParallel.size()); }

运行结果:

listFor size :1000 listParallel size :917

显而易见,stream.parallel.forEach()中执行的操作并非线程安全。如果需要线程安全,可以把集合转换为同步集合,即:Collections.synchronizedList(new ArrayList<>())。

6. 正确使用 ParallelStream

当我们对parallelStream有了足够的了解之后,再来考虑是否需要使用ParallelStream:

  1. 使用ParallelStream可以简洁高效的写出并发代码;
  2. ParallelStream并行执行是无序的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果,在这种情况下需要慎重选择;
  3. ParallelStream提供了更简单的并发执行的实现,但并不意味着更高的性能;比如当数据量不大时,顺序执行往往比并行执行更快。毕竟线程池准备,频繁切换线程是耗时的。但是当任务涉及到I/O操作并且任务之间不互相依赖时,将这类程序并行化之后,执行速度将会明显的提升;
  4. 任务之间最好是状态无关的,因为ParallelStream默认是非线程安全的,可能带来结果的不确定性。
原文链接:https://yq.aliyun.com/articles/721180
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章