首页 文章 精选 留言 我的

精选列表

搜索[高并发],共10000篇文章
优秀的个人博客,低调大师

java并发(二):深入分析volatile实现原理

volatile的原理实现可以看这篇文章,真的是从硬件层面上说明了volatile怎样保证可见性 下面这个实例,如果没有设置成volatile关键字,那么线程读的 isRunning永远都是自己私有内存中的,线程将会一直在while循环中 public class RunThread extends Thread{ private volatile boolean isRunning = true; private void setRunning(boolean isRunning){ this.isRunning = isRunning; } public void run(){ System.out.println("进入run方法.."); int i = 0; while(isRunning == true){ //.. } System.out.println("线程停止"); } public static void main(String[] args) throws InterruptedException { RunThread rt = new RunThread(); rt.start(); Thread.sleep(1000); rt.setRunning(false); System.out.println("isRunning的值已经被设置了false"); } } 这是展示volatile虽然有可见性,但是没有原子性: /** * volatile关键字不具备synchronized关键字的原子性(同步) * @author alienware * */ public class VolatileNoAtomic extends Thread{ private static volatile int count = 0; //这个被注释的代码可以保证结果正确 //private static AtomicInteger count = new AtomicInteger(0); private static void addCount(){ for (int i = 0; i < 1000; i++) { count++ ; //这个被注释的代码可以保证结果正确 //count.incrementAndGet(); } System.out.println(count); } public void run(){ addCount(); } public static void main(String[] args) { VolatileNoAtomic[] arr = new VolatileNoAtomic[100]; for (int i = 0; i < 10; i++) { arr[i] = new VolatileNoAtomic(); } for (int i = 0; i < 10; i++) { arr[i].start(); } } } 这是使用atomic,保证原子性的代码: public class AtomicUse { private static AtomicInteger count = new AtomicInteger(0); //多个addAndGet在一个方法内是非原子性的,需要加synchronized进行修饰,保证4个addAndGet整体原子性 /**synchronized*/ public synchronized int multiAdd(){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } count.addAndGet(1); count.addAndGet(2); count.addAndGet(3); count.addAndGet(4); //+10 return count.get(); } public static void main(String[] args) { final AtomicUse au = new AtomicUse(); List<Thread> ts = new ArrayList<Thread>(); for (int i = 0; i < 100; i++) { ts.add(new Thread(new Runnable() { @Override public void run() { System.out.println(au.multiAdd()); } })); } for(Thread t : ts){ t.start(); } } } 线程通信 ListAdd2.java,可以看出本来list已经到5了,那么t2应该出while循环抛异常,但是因为它执行了wait方法,释放锁了。而t1得到锁一直执行,虽然t1执行了notify方法,但是只是发出通知而已,只有它的方法执行完才释放锁让t2执行。 package com.bjsxt.base.conn008; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; /** * wait notfiy 方法,wait释放锁,notfiy不释放锁 * @author alienware * */ public class ListAdd2 { private volatile static List list = new ArrayList(); public void add(){ list.add("bjsxt"); } public int size(){ return list.size(); } public static void main(String[] args) { final ListAdd2 list2 = new ListAdd2(); // 1 实例化出来一个 lock // 当使用wait 和 notify 的时候 , 一定要配合着synchronized关键字去使用 final Object lock = new Object(); // final CountDownLatch countDownLatch = new CountDownLatch(1); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { synchronized (lock) { for(int i = 0; i <10; i++){ list2.add(); System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素.."); Thread.sleep(500); if(list2.size() == 5){ System.out.println("已经发出通知.."); // countDownLatch.countDown(); lock.notify(); } } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { synchronized (lock) { if(list2.size() != 5){ try { System.out.println("t2进入..."); lock.wait(); // countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止.."); throw new RuntimeException(); } } }, "t2"); t2.start(); t1.start(); } } package com.xushu.multi; import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class MyQueue { private LinkedList<Object> list = new LinkedList<Object>(); private AtomicInteger count = new AtomicInteger(0); private final int minSize = 0; private final int maxSize; public MyQueue(int size) { this.maxSize = size; } private final Object lock = new Object(); public void put(Object obj) { synchronized (lock) { if (count.get() == this.maxSize) { try { lock.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } list.add(obj); count.incrementAndGet(); lock.notify(); System.out.println("新加入的元素为:" + obj); } } public Object take() { Object ret = null; synchronized (lock) { if (count.get() == minSize) { try { lock.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } ret = list.removeFirst(); count.decrementAndGet(); lock.notify(); } return ret; } public int getSize() { return this.count.get(); } public static void main(String[] args) { final MyQueue mq = new MyQueue(5); mq.put("a"); mq.put("b"); mq.put("c"); mq.put("d"); System.out.println("当前容器的长度:" + mq.getSize()); Thread t1 = new Thread(new Runnable() { @Override public void run() { mq.put("f"); mq.put("g"); mq.put("e"); } }, "t1"); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { Object o1 = mq.take(); System.out.println("移除的元素为:" + o1); Object o2 = mq.take(); System.out.println("移除的元素为:" + o2); } }, "t2"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } t2.start(); } }

优秀的个人博客,低调大师

Java并发编程(2) AbstractQueuedSynchronizer的内部结构

一 前言 虽然已经有很多前辈已经分析过AbstractQueuedSynchronizer(简称AQS,也叫队列同步器)类,但是感觉那些点始终是别人的,看一遍甚至几遍终不会印象深刻。所以还是记录下来印象更深刻,还能和大家一起探讨(这就是重复造轮子的好处,另外也主要是这篇篇幅太长了,犹豫了好久才决定写作)。既然有很多前辈都分析过这个类说明它是多么的重要,下面我们看下concurrent包的实现示意图就清楚AQS的所占有的地位了。 二 什么是AQS AbstractQueuedSynchronizer,中文简称队列同步器,英文简称AQS。它是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。从上面图可以看出AQS是实现锁或任意同步组件的关键,通过继承同步器并实现它的抽象方法来管理同步状态等。 三 AQS的内部结构 个人习惯喜欢先看其内部结构,因为内部结果是一个类实现的核心。经过分析得知:AQS类底层的数据结构是使用双向链表,包括head结点和tail结点,head结点主要用作后续的调度。另外还包含一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition 链表(其中链表是队列的一种具体表现,所以也可称作队列)。如下图: 四 内部结构源码解析 3.1 类的继承关系 1、说明它是一个抽象类,就说明它可能存在抽象方法需要子类去重写实现(具体有哪些方法需要重写后续会说明)。 2、它还继承了AbstractOwnableSynchronizer(简称AOS)类可以设置独占资源线程和获取独占资源线程(独占锁会涉及到,AOS的源码自己可以进去看看)。 另外建议各位多看看类上的注释,其实还蛮有作用的。 3.2 类的内部类 先分析内部类中的结构再看AQS是怎么引用它的。下面先看Node.class,主要分析都在注释上了。 /** * Wait queue node class. * 注意看类上的注释,上面是原注释的第一行,表示等待队列节点类(虽然实际上是一个双向链表)。 */ static final class Node { /** * 总共分为两者模式:共享和独占 */ /** 在共享模式中等待的节点 */ static final Node SHARED = new Node(); /** 在独占模式中等待的节点 */ static final Node EXCLUSIVE = null; /** * 下面几个表示节点状态,也就是waitStatus所具有可能的值。 */ /** * 标记线程处于取消状态 * 节点进入该状态就不会变化。 * / static final int CANCELLED = 1; /** * 标记后继节点的线程处于等待状态,需要被取消停放(即被唤醒unpark)。 * 变化情况:当当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。 */ static final int SIGNAL = -1; /** * 标记线程正在等待条件(Condition),也就是该节点处于等待队列中。 * 变化情况:当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中。 */ static final int CONDITION = -2; /** * 表示下一次共享式同步状态获取将会无条件的被传播下去。 */ static final int PROPAGATE = -3; /** * 节点状态,包含上面四种状态(另外还有一种初始化状态0) * 特别注意:它是volatile关键字修饰的,保证对其线程可见性,但是不保证原子性。 * 所以更新状态时,采用CAS方式去更新, 如:compareAndSetWaitStatus */ volatile int waitStatus; /** * 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。 */ volatile Node prev; /** * 后继节点。 */ volatile Node next; /** * 入队列时的当前线程。 */ volatile Thread thread; /** * 存储condition队列中的后继节点。 */ Node nextWaiter; /** * 判断是否共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 获取前置节点,如果前置节点为空就抛出异常 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 省略三个构造函数 } 总结下:当每个线程被阻塞时都会封装成一个Node节点,放入队列中。每个节点都包含了当前节点对应的线程、状态、前置节点引用、后继节点引用以及下一个等待者。 其中还需要注意的是waitStatus对应的各个状态代表着什么意思,另外不清楚volatile关键字作用的请前去阅读下。 接下来简单看看ConditionObject的源码,后续我们会单独分析下这个类的作用。 /** * 实现Condition接口 */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** * 条件队列的第一个节点。 */ private transient AbstractQueuedSynchronizer.Node firstWaiter; /** * 条件队列的最后一个节点。 */ private transient AbstractQueuedSynchronizer.Node lastWaiter; } 从中可以看它还是实现了Condition接口,而Condition接口又定义了什么规范呢?自己去看:),你会不会发现有点跟Object中的几个方法类似呢。 3.3 主要内部成员 // 头结点 private transient volatile Node head; // 尾结点 private transient volatile Node tail; // 同步状态 private volatile int state; 五 总结 通过上述分析就很清楚其内部结构是什么了吧。总结下: 节点(Node)是成为sync队列和condition队列构建的基础,在同步器中就包含了sync队列(Node双向链表)。同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。 文章来源:https://www.cnblogs.com/yuanfy008/p/9608666.html 推荐阅读:https://www.roncoo.com/course/list.html?courseName=%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B

优秀的个人博客,低调大师

Java并发编程笔记之ThreadLocal内存泄漏探究

使用 ThreadLocal 不当可能会导致内存泄露,是什么原因导致的内存泄漏呢? 我们首先看一个例子,代码如下: /** * Created by cong on 2018/7/14. */ public class ThreadLocalOutOfMemoryTest { static class LocalVariable { private Long[] a = new Long[1024*1024]; } // (1) final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(6,6,1,TimeUnit.MINUTES, new LinkedBlockingQueue<>()); // (2) final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<LocalVariable>(); public static void main(String[] args) throws InterruptedException { // (3) for (int i = 0; i < 50; ++i) { poolExecutor.execute(new Runnable() { public void run() { // (4) localVariable.set(new LocalVariable()); // (5) System.out.println("use local varaible"); // localVariable.remove(); } }); Thread.sleep(1000); } // (6) System.out.println("pool execute over"); } } 代码(1)创建了一个核心线程数和最大线程数为 6 的线程池,这个保证了线程池里面随时都有 6 个线程在运行。 代码(2)创建了一个 ThreadLocal 的变量,泛型参数为 LocalVariable,LocalVariable 内部是一个 Long 数组。 代码(3)向线程池里面放入 50 个任务。 代码(4)设置当前线程的 localVariable 变量,也就是把 new 的 LocalVariable 变量放入当前线程的 threadLocals 变量。 由于没有调用线程池的 shutdown 或者 shutdownNow 方法所以线程池里面的用户线程不会退出,进而 JVM 进程也不会退出。 运行后,我们立即打开jconsole监控堆内存变化,如下图: 接着,让我们打开 localVariable.remove() 注释,然后在运行,观察堆内存变化如下: 从第一次运行结果可知,当主线程处于休眠时候进程占用了大概 75M 内存,打开 localVariable.remove() 注释后第二次运行则占用了大概 25M 内存,可知没有写 localVariable.remove()时候内存发生了泄露,下面分析下泄露的原因,如下: 第一次运行的代码,在设置线程的 localVariable 变量后没有调用localVariable.remove()方法,导致线程池里面的 5 个线程的 threadLocals 变量里面的new LocalVariable()实例没有被释放,虽然线程池里面的任务执行完毕了,但是线程池里面的 5 个线程会一直存在直到 JVM 退出。这里需要注意的是由于 localVariable 被声明了 static,虽然线程的 ThreadLocalMap 里面是对 localVariable 的弱引用,localVariable 也不会被回收。运行结果二的代码由于线程在设置 localVariable 变量后即使调用了localVariable.remove()方法进行了清理,所以不会存在内存泄露。 接下来我们要想清楚的知道内存泄漏的根本原因,那么我们就要进入源码去看了。 我们知道ThreadLocal 只是一个工具类,具体存放变量的是在线程的 threadLocals 变量里面,threadLocals 是一个 ThreadLocalMap 类型的,我们首先一览ThreadLocalMap的类图结构,类图结构如下图: 如上图 ThreadLocalMap 内部是一个 Entry 数组, Entry 继承自 WeakReference,Entry 内部的 value 用来存放通过 ThreadLocal 的 set 方法传递的值,那么 ThreadLocal 对象本身存放到哪里了吗? 下面看看 Entry 的构造函数,如下所示: Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } 接着我们再接着看Entry的父类WeakReference的构造函数super(k),如下所示: public WeakReference(T referent) { super(referent); } 接着我们再看WeakReference的父类Reference的构造函数super(referent),如下所示: Reference(T referent) { this(referent, null); } 接着我们再看WeakReference的父类Reference的另外一个构造函数this(referent , null),如下所示: Reference(T referent, ReferenceQueue<? super T> queue) { this.referent = referent; this.queue = (queue == null) ? ReferenceQueue.NULL : queue; } 可知 k 被传递到了 WeakReference 的构造函数里面,也就是说 ThreadLocalMap 里面的 key 为 ThreadLocal 对象的弱引用,具体是 referent 变量引用了 ThreadLocal 对象,value 为具体调用 ThreadLocal 的 set 方法传递的值。 当一个线程调用 ThreadLocal 的 set 方法设置变量时候,当前线程的 ThreadLocalMap 里面就会存放一个记录,这个记录的 key 为 ThreadLocal 的引用,value 则为设置的值。 但是考虑如果这个 ThreadLocal 变量没有了其他强依赖,而当前线程还存在的情况下,由于线程的 ThreadLocalMap 里面的 key 是弱依赖,则当前线程的 ThreadLocalMap 里面的 ThreadLocal 变量的弱引用会被在 gc 的时候回收,但是对应 value 还是会造成内存泄露,这时候 ThreadLocalMap 里面就会存在 key 为 null 但是 value 不为 null 的 entry 项。 其实在 ThreadLocal 的 set 和 get 和 remove 方法里面有一些时机是会对这些 key 为 null 的 entry 进行清理的,但是这些清理不是必须发生的,下面简单讲解ThreadLocalMap 的 remove 方法的清理过程,remove 的源码,如下所示: private void remove(ThreadLocal<?> key) { //(1)计算当前ThreadLocal变量所在table数组位置,尝试使用快速定位方法 Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); //(2)这里使用循环是防止快速定位失效后,变量table数组 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { //(3)找到 if (e.get() == key) { //(4)找到则调用WeakReference的clear方法清除对ThreadLocal的弱引用 e.clear(); //(5)清理key为null的元素 expungeStaleEntry(i); return; } } } private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; //(6)去掉去value的引用 tab[staleSlot].value = null; tab[staleSlot] = null; size--; Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); //(7)如果key为null,则去掉对value的引用。 if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; } 代码(4)调用了 Entry 的 clear 方法,实际调用的是父类 WeakReference 的 clear 方法,作用是去掉对 ThreadLocal 的弱引用。 代码(6)是去掉对 value 的引用,到这里当前线程里面的当前 ThreadLocal 对象的信息被清理完毕了。 代码(7)从当前元素的下标开始看 table 数组里面的其他元素是否有 key 为 null 的,有则清理。循环退出的条件是遇到 table 里面有 null 的元素。所以这里知道 null 元素后面的 Entry 里面 key 为 null 的元素不会被清理。 总结: 1.ThreadLocalMap 内部 Entry 中 key 使用的是对 ThreadLocal 对象的弱引用,这为避免内存泄露是一个进步,因为如果是强引用,那么即使其他地方没有对 ThreadLocal 对象的引用,ThreadLocalMap 中的 ThreadLocal 对象还是不会被回收,而如果是弱引用则这时候 ThreadLocal 引用是会被回收掉的。 2.但是对于的 value 还是不能被回收,这时候 ThreadLocalMap 里面就会存在 key 为 null 但是 value 不为 null 的 entry 项,虽然 ThreadLocalMap 提供了 set,get,remove 方法在一些时机下会对这些 Entry 项进行清理,但是这是不及时的,也不是每次都会执行的,所以一些情况下还是会发生内存泄露,所以在使用完毕后即使调用 remove 方法才是解决内存泄露的最好办法。 3.线程池里面设置了 ThreadLocal 变量一定要记得及时清理,因为线程池里面的核心线程是一直存在的,如果不清理,那么线程池的核心线程的 threadLocals 变量一直会持有 ThreadLocal 变量。

优秀的个人博客,低调大师

Java并发编程的艺术(十)——线程池(1)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34173549/article/details/79612287 线程池的作用 减少资源的开销减少了每次创建线程、销毁线程的开销。 提高响应速度每次请求到来时,由于线程的创建已经完成,故可以直接执行任务,因此提高了响应速度。 提高线程的可管理性线程是一种稀缺资源,若不加以限制,不仅会占用大量资源,而且会影响系统的稳定性。因此,线程池可以对线程的创建与停止、线程数量等等因素加以控制,使得线程在一种可控的范围内运行,不仅能保证系统稳定运行,而且方便性能调优。 线程池的实现原理 线程池一般由两种角色构成:多个工作线程 和 一个阻塞队列。 工作线程工作线程是一组已经处在运行中的线程,它们不断地向阻塞队列中领取任务执行。 阻塞队列阻塞队列用于存储工作线程来不及处理的任务。当工作线程都在执行任务时,到来的新任务就只能暂时在阻塞队列中存储。 ThreadPoolExecutor的使用 创建线程池 通过如下代码即可创建一个线程池: new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnableTaskQueue, handler); 1 corePoolSize:基本线程数量它表示你希望线程池达到的一个值。线程池会尽量把实际线程数量保持在这个值上下。 maximumPoolSize:最大线程数量这是线程数量的上界。如果实际线程数量达到这个值: 阻塞队列未满:任务存入阻塞队列等待执行 阻塞队列已满:调用饱和策略 keepAliveTime:空闲线程的存活时间当实际线程数量超过corePoolSize时,若线程空闲的时间超过该值,就会被停止。PS:当任务很多,且任务执行时间很短的情况下,可以将该值调大,提高线程利用率。 timeUnit:keepAliveTime的单位 runnableTaskQueue:任务队列这是一个存放任务的阻塞队列,可以有如下几种选择: ArrayBlockingQueue它是一个由数组实现的阻塞队列,FIFO。 LinkedBlockingQueue它是一个由链表实现的阻塞队列,FIFO。吞吐量通常要高于ArrayBlockingQueue。fixedThreadPool使用的阻塞队列就是它。它是一个无界队列。 SynchronousQueue它是一个没有存储空间的阻塞队列,任务提交给它之后必须要交给一条工作线程处理;如果当前没有空闲的工作线程,则立即创建一条新的工作线程。cachedThreadPool用的阻塞队列就是它。它是一个无界队列。 PriorityBlockingQueue它是一个优先权阻塞队列。 handler:饱和策略当实际线程数达到maximumPoolSize,并且阻塞队列已满时,就会调用饱和策略。JDK1.5由四种饱和策略: AbortPolicy默认。直接抛异常。 CallerRunsPolicy只用调用者所在的线程执行任务。 DiscardOldestPolicy丢弃任务队列中最久的任务。 DiscardPolicy丢弃当前任务。 提交任务 可以向ThreadPoolExecutor提交两种任务:Callable和Runnable。 Callable该类任务有返回结果,可以抛出异常。通过submit函数提交,返回Future对象。可通过get获取执行结果。 Runnable该类任务只执行,无法获取返回结果,并在执行过程中无法抛异常。通过execute提交。 关闭线程池 关闭线程池有两种方式:shutdown和shutdownNow,关闭时,会遍历所有的线程,调用它们的interrupt函数中断线程。但这两种方式对于正在执行的线程处理方式不同。 shutdown()仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。 shutdownNow()不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。 ThreadPoolExecutor运行机制 当有请求到来时: 若当前实际线程数量少于corePoolSize,即使有空闲线程,也会创建一个新的工作线程; 若当前实际线程数量处于corePoolSize和maximumPoolSize之间,并且阻塞队列没满,则任务将被放入阻塞队列中等待执行; 若当前实际线程数量小于maximumPoolSize,但阻塞队列已满,则直接创建新线程处理任务; 若当前实际线程数量已经达到maximumPoolSize,并且阻塞队列已满,则使用饱和策略。 设置合理的线程池大小 任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池。 CPU密集型任务尽量使用较小的线程池,一般为CPU核心数+1。因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。 IO密集型任务可以使用稍大的线程池,一般为2*CPU核心数。IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。 混合型任务可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

优秀的个人博客,低调大师

java面试- Java并发编程(十)——线程池(1)

线程池的作用 减少资源的开销减少了每次创建线程、销毁线程的开销。 提高响应速度每次请求到来时,由于线程的创建已经完成,故可以直接执行任务,因此提高了响应速度。 提高线程的可管理性线程是一种稀缺资源,若不加以限制,不仅会占用大量资源,而且会影响系统的稳定性。因此,线程池可以对线程的创建与停止、线程数量等等因素加以控制,使得线程在一种可控的范围内运行,不仅能保证系统稳定运行,而且方便性能调优。 线程池的实现原理 线程池一般由两种角色构成:多个工作线程 和 一个阻塞队列。 工作线程工作线程是一组已经处在运行中的线程,它们不断地向阻塞队列中领取任务执行。 阻塞队列阻塞队列用于存储工作线程来不及处理的任务。当工作线程都在执行任务时,到来的新任务就只能暂时在阻塞队列中存储。 ThreadPoolExecutor的使用 创建线程池 通过如下代码即可创建一个线程池: new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnableTaskQueue, handler); 1 corePoolSize:基本线程数量它表示你希望线程池达到的一个值。线程池会尽量把实际线程数量保持在这个值上下。 maximumPoolSize:最大线程数量这是线程数量的上界。如果实际线程数量达到这个值: 阻塞队列未满:任务存入阻塞队列等待执行 阻塞队列已满:调用饱和策略 keepAliveTime:空闲线程的存活时间当实际线程数量超过corePoolSize时,若线程空闲的时间超过该值,就会被停止。PS:当任务很多,且任务执行时间很短的情况下,可以将该值调大,提高线程利用率。 timeUnit:keepAliveTime的单位 runnableTaskQueue:任务队列这是一个存放任务的阻塞队列,可以有如下几种选择: ArrayBlockingQueue它是一个由数组实现的阻塞队列,FIFO。 LinkedBlockingQueue它是一个由链表实现的阻塞队列,FIFO。吞吐量通常要高于ArrayBlockingQueue。fixedThreadPool使用的阻塞队列就是它。它是一个无界队列。 SynchronousQueue它是一个没有存储空间的阻塞队列,任务提交给它之后必须要交给一条工作线程处理;如果当前没有空闲的工作线程,则立即创建一条新的工作线程。cachedThreadPool用的阻塞队列就是它。它是一个无界队列。 PriorityBlockingQueue它是一个优先权阻塞队列。 handler:饱和策略当实际线程数达到maximumPoolSize,并且阻塞队列已满时,就会调用饱和策略。JDK1.5由四种饱和策略: AbortPolicy默认。直接抛异常。 CallerRunsPolicy只用调用者所在的线程执行任务。 DiscardOldestPolicy丢弃任务队列中最久的任务。 DiscardPolicy丢弃当前任务。 提交任务 可以向ThreadPoolExecutor提交两种任务:Callable和Runnable。 Callable该类任务有返回结果,可以抛出异常。通过submit函数提交,返回Future对象。可通过get获取执行结果。 Runnable该类任务只执行,无法获取返回结果,并在执行过程中无法抛异常。通过execute提交。 关闭线程池 关闭线程池有两种方式:shutdown和shutdownNow,关闭时,会遍历所有的线程,调用它们的interrupt函数中断线程。但这两种方式对于正在执行的线程处理方式不同。 shutdown()仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。 shutdownNow()不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。 ThreadPoolExecutor运行机制 当有请求到来时: 若当前实际线程数量少于corePoolSize,即使有空闲线程,也会创建一个新的工作线程; 若当前实际线程数量处于corePoolSize和maximumPoolSize之间,并且阻塞队列没满,则任务将被放入阻塞队列中等待执行; 若当前实际线程数量小于maximumPoolSize,但阻塞队列已满,则直接创建新线程处理任务; 若当前实际线程数量已经达到maximumPoolSize,并且阻塞队列已满,则使用饱和策略。 设置合理的线程池大小 任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池。 CPU密集型任务尽量使用较小的线程池,一般为CPU核心数+1。因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。 IO密集型任务可以使用稍大的线程池,一般为2*CPU核心数。IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。 混合型任务可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

优秀的个人博客,低调大师

Java并发编程的艺术(十一)——线程池(2)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34173549/article/details/79612326 Executor两级调度模型 在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。在Java虚拟机层面,用户将多个任务提交给Executor框架,Executor负责分配线程执行它们;在操作系统层面,操作系统再将这些线程分配给处理器执行。 Executor结构 Executor框架中的所有类可以分成三类: 任务任务有两种类型:Runnable和Callable。 任务执行器Executor框架最核心的接口是Executor,它表示任务的执行器。Executor的子接口为ExecutorService。ExecutorService有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。 执行结果Future接口表示异步的执行结果,它的实现类为FutureTask。 线程池 Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。 1. FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 1 2 3 它是一种固定大小的线程池; corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads; keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效; 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列; 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务; 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。 2. CachedThreadPool public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>()); } 1 2 3 它是一个可以无限扩大的线程池; 它比较适合处理执行时间比较小的任务; corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大; keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死; 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。 3. SingleThreadExecutor public static ExecutorService newSingleThreadExecutor(){ return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 1 2 3 它只会创建一条工作线程处理任务; 采用的阻塞队列为LinkedBlockingQueue; 4. ScheduledThreadPool 它用来处理延时任务或定时任务。 它接收SchduledFutureTask类型的任务,有两种提交任务的方式: scheduledAtFixedRate scheduledWithFixedDelay SchduledFutureTask接收的参数: time:任务开始的时间 sequenceNumber:任务的序号 period:任务执行的时间间隔 它采用DelayQueue存储等待的任务 DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序; DelayQueue也是一个无界队列; 工作线程的执行过程: 工作线程会从DelayQueue取已经到期的任务去执行; 执行结束后重新设置任务的到期时间,再次放回DelayQueue

优秀的个人博客,低调大师

java面试-Java并发编(十一)——线程池(2)

Executor两级调度模型 在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。在Java虚拟机层面,用户将多个任务提交给Executor框架,Executor负责分配线程执行它们;在操作系统层面,操作系统再将这些线程分配给处理器执行。 Executor结构 Executor框架中的所有类可以分成三类: 任务任务有两种类型:Runnable和Callable。 任务执行器Executor框架最核心的接口是Executor,它表示任务的执行器。Executor的子接口为ExecutorService。ExecutorService有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。 执行结果Future接口表示异步的执行结果,它的实现类为FutureTask。 线程池 Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。 1. FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 1 2 3 它是一种固定大小的线程池; corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads; keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效; 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列; 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务; 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。 2. CachedThreadPool public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>()); } 1 2 3 它是一个可以无限扩大的线程池; 它比较适合处理执行时间比较小的任务; corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大; keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死; 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。 3. SingleThreadExecutor public static ExecutorService newSingleThreadExecutor(){ return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 1 2 3 它只会创建一条工作线程处理任务; 采用的阻塞队列为LinkedBlockingQueue; 4. ScheduledThreadPool 它用来处理延时任务或定时任务。 它接收SchduledFutureTask类型的任务,有两种提交任务的方式: scheduledAtFixedRate scheduledWithFixedDelay SchduledFutureTask接收的参数: time:任务开始的时间 sequenceNumber:任务的序号 period:任务执行的时间间隔 它采用DelayQueue存储等待的任务 DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序; DelayQueue也是一个无界队列; 工作线程的执行过程: 工作线程会从DelayQueue取已经到期的任务去执行; 执行结束后重新设置任务的到期时间,再次放回DelayQueue

优秀的个人博客,低调大师

Java并发编程 -- 深入剖析volatile关键字

1.volatile关键字的两层语义 一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义: 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。 禁止进行指令重排序。 我们来看一段代码: 假如线程1先执行,线程2后执行: //线程1 boolean stop = false; while(!stop){ //doSomething(); } //线程2 stop = true; 事实上,这段代码会真的先执行线程1,然后再执行2吗,答案是肯定的:不是。两个线程各干各的事情,没有绝对的先后问题,所以会出现两种答案(一个线程用stop=false跑线程1,一个线程用stop=true跑线程二,互不相关)。 但是用volatile修饰之后就变得不一样了: 第一:使用volatile关键字会强制将修改的值立即写入主存; 第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量stop的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效); 第三:由于线程1的工作内存中缓存变量stop的缓存行无效,所以线程1再次读取变量stop的值时会去主存读取。 所以肯定能保证不管 多少个线程跑的时候,stop的值是相同的。这是利用了volatile的线程可见性原理。 2.volatile保证原子性吗? 从上面知道volatile关键字保证了操作的可见性,但是volatile能保证对变量的操作是原子性吗?看一段代码: public class Test { public volatile int inc = 0; public void increase() { inc++; } public static void main(String[] args) { final Test test = new Test(); for(int i=0;i<10;i++){ new Thread(){ public void run() { for(int j=0;j<1000;j++) test.increase(); }; }.start(); } while(Thread.activeCount()>1) //保证前面的线程都执行完 Thread.yield(); System.out.println(test.inc); } } 经过上面的介绍volatile的可见性,我想大家能很快的得出这段代码的答案10000。然而事实上打印出来的数字总比10000小。这就涉及到了volatile与原子性操作的联系。 原因:自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。 自增操作是不具备原子性的,它包括读取变量的原始值、进行加1操作、写入工作内存。那么就是说自增操作的三个子操作可能会分割开执行 假如线程1从住内存中获取到变量值,在执行自增的时候是阻塞性质的,这时候线程2也拿到一个相同的值,然后也进行自增,那么这两个线程最终写入的值是一样的。 那如何修改呢?有三种方式 给自增方法加上同步锁synchronized public class Test { public int inc = 0; public synchronized void increase() { inc++; } public static void main(String[] args) { final Test test = new Test(); for(int i=0;i<10;i++){ new Thread(){ public void run() { for(int j=0;j<1000;j++) test.increase(); }; }.start(); } while(Thread.activeCount()>1) //保证前面的线程都执行完 Thread.yield(); System.out.println(test.inc); } } 采用Lock public class Test { public int inc = 0; Lock lock = new ReentrantLock(); public void increase() { lock.lock(); try { inc++; } finally{ lock.unlock(); } } public static void main(String[] args) { final Test test = new Test(); for(int i=0;i<10;i++){ new Thread(){ public void run() { for(int j=0;j<1000;j++) test.increase(); }; }.start(); } while(Thread.activeCount()>1) //保证前面的线程都执行完 Thread.yield(); System.out.println(test.inc); } } 采用AtomicInteger public class Test { public AtomicInteger inc = new AtomicInteger(); public void increase() { inc.getAndIncrement(); } public static void main(String[] args) { final Test test = new Test(); for(int i=0;i<10;i++){ new Thread(){ public void run() { for(int j=0;j<1000;j++) test.increase(); }; }.start(); } while(Thread.activeCount()>1) //保证前面的线程都执行完 Thread.yield(); System.out.println(test.inc); } } 在java 1.5的java.util.concurrent.atomic包下提供了一些原子操作类,即对基本数据类型的 自增(加1操作),自减(减1操作)、以及加法操作(加一个数),减法操作(减一个数)进行了封装,保证这些操作是原子性操作。atomic是利用CAS来实现原子性操作的(Compare And Swap),CAS实际上是利用处理器提供的CMPXCHG指令实现的,而处理器执行CMPXCHG指令是一个原子性操作。 3.volatile能保证有序性吗? 在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。 4.volatile的原理和实现机制 前面讲述了源于volatile关键字的一些使用,下面我们来探讨一下volatile到底如何保证可见性和禁止指令重排序的。 下面这段话摘自《深入理解Java虚拟机》: “观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令” lock前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供3个功能: 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成; 它会强制将对缓存的修改操作立即写入主存; 如果是写操作,它会导致其他CPU中对应的缓存行无效。

优秀的个人博客,低调大师

Java并发编程 -- 线程安全、优先级设定

优先级设定 一个合理的优先级可以在一定条件下避免一些活跃性问题,比如死锁、饥饿等 public class Task implements Runnable{ @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + "线程执行了..."); } } } public class PriorityTest { public static void main(String[] args) { Thread t1 = new Thread(new Task()); Thread t2 = new Thread(new Task()); /** * 设置优先级 * MAX_PRIORITY=10 * MIN_PRIORITY=1 * NORM_PRIORITY=5 */ t1.setPriority(Thread.NORM_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); } } 线程安全问题解决 当多个线程在非原子处理下操作相同的资源时,难免出现资源的混乱。 我在这里举个value++的例子。 无线程安全时 public class Task{ public int value = 0; // 没有处理线程安全 public int getValue() { return value++; } public static void main(String[] args) { Task task = new Task(); new Thread(){ @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + " " + task.getValue()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); new Thread(){ @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + " " + task.getValue()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } } image.png 我们会发现2和5有重复。这是jvm底层的问题,我在后文有分析。 处理线程安全 如果我们给线程加锁,将其变成原子处理,就会解决该问题。 public synchronized int getValue() { return value++; } 既在修饰符后加上synchronized关键字。 image.png 但是又有一个问题,这样的话,其实原理上是串行处理的,那我们该如果解决这个问题呢。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册