首页 文章 精选 留言 我的

精选列表

搜索[高并发],共10000篇文章
优秀的个人博客,低调大师

详解java并发包源码之AQS独占方法源码分析

AQS 的实现原理 学完用 AQS 自定义一个锁以后,我们可以来看一下刚刚使用过的方法的实现。 分析源码的时候会省略一些不重要的代码。 AQS 的实现是基于一个 FIFO 队列的,每一个等待的线程被封装成Node存放在等待队列中,头结点是空的,不存储信息,等待队列中的节点都是阻塞的,并且在每次被唤醒后都会检测自己的前一个节点是否为头结点,如果是头节点证明在这个线程之前没有在等待的线程,就尝试着去获取共享资源。 AQS 的继承关系 AQS 继承了AbstractOwnableSynchronizer,我们先分析一下这个父类。 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractOwnableSynchronizer() { } /** * 独占模式下的线程 */ private transient Thread exclusiveOwnerThread; /** * 设置线程,只是对线程的 set 方法 */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * 设置线程,对线程的 get 方法 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } } 父类非常简单,持有一个独占模式下的线程,然后就只剩下对这个线程的 get 和 set 方法。 AQS的内部类 AQS 是用链表队列来实现线程等待的,那么队列肯定要有节点,我们先从节点讲起。 Node 类,每一个等待的线程都会被封装成 Node 类 Node 的域 public class Node { int waitStatus; Node prev; Node next; Thread thread; Node nextWaiter; } waitStatus:等待状态 prev:前驱节点 next:后继节点 thread:持有的线程 nextWaiter:condiction 队列中的后继节点 Node 的 status: Node 的状态有四种: CANCELLED,值为 1,表示当前的线程被取消,被打断或者获取超时了 SIGNAL,值为 -1,表示当前节点的后继节点包含的线程需要运行,也就是 unpark; CONDITION,值为 -2,表示当前节点在等待 condition,也就是在 condition 队列中; PROPAGATE,值为 -3,表示当前场景下后续的 acquireShared 能够得以执行; 取消状态的值是唯一的正数,也是唯一当排队排到它了也不要资源而是直接轮到下个线程来获取资源的 AQS 中的方法源码分析 acquire 这个方法执行了: tryAcquire public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 看到上面的 tryAcquire 返回 false 后就会调用addWaiter新建节点加入等待队列中。参数 EXCLUSIVE 是独占模式。 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 拿到尾节点,如果尾节点是空则说明是第一个节点,就直接入队就好了 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果尾节点不是空的,则需要特殊方法入队 enq(node); return node; } 在addWaiter方法创建完节点后,调用 enq 方法,在循环中用 CAS 操作将新的节点入队。 因为可能会有多个线程同时设置尾节点,所以需要放在循环中不断的设置尾节点。 private Node enq(final Node node) { for (;;) { Node t = tail; // 查看尾节点 if (t == null) { // Must initialize // 尾节点为空则设置为刚刚创建的节点 if (compareAndSetHead(new Node())) tail = head; } else { // 尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 在这里,节点入队就结束了。 那么我们回来前面分析的方法, public final void acquire(long arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 刚刚分析完了addWaiter方法,这个方法返回了刚刚创建并且加入的队列。现在开始分析acquireQueued方法。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 在循环中去获取锁 for (;;) { // 拿前一个节点 final Node p = node.predecessor(); // 如果前一个节点是头结点则尝试着去获取锁 if (p == head && tryAcquire(arg)) { // 拿到锁后把这个节点设为头结点,这里 setHead 会把除了 next 以外的数据清除 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 这个方法查看在获取锁失败以后是否中断,如果否的话就调用 // parkAndCheckInterrupt 阻塞方法线程,等待被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } acquireInterruptibly 因为很像所以顺便来看一下acquireInterruptibly所调用的方法:在此我向大家推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 只有这一句有差别,获取失败了并且检测到中断位被设为 true 直接抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } acquireNanos 再来看一下有限时间的,当获取超时以后会将节点 Node 的状态设为 cancel,设置为取消的用处在后面的 release 方法中会有体现。 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 总结一下过程 release 这个方法首先去调用了我们实现的 tryRelease,当结果返回成功的时候,拿到头结点,调用 unparkSuccessor 方法来唤醒头结点的下一个节点。在此我向大家推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像 public final boolean release(long arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitSatus; // 因为已经获取过锁,所以将状态设设为 0。失败也没所谓,说明有其他的线程把它设为0了 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 一般来说头结点的下一个节点是在等待着被唤醒的,但是如果是取消的或者意外的是空的, * 则向后遍历直到找到没有被取消的节点 * */ Node s = node.next; // 为空或者大于 0,只有 cancel 状态是大于 0 的 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

优秀的个人博客,低调大师

Java并发编程笔记之Semaphore信号量源码分析

JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢? Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。为了能够一览Semaphore的内部结构,我们首先要看一下Semaphore的类图,类图,如下所示: 如上类图可以知道Semaphoren内部还是使用AQS来实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,分别代表获取信号量的时候是否采取公平策略。创建Semaphore的时候会有一个变量标示是否使用公平策略,源码如下: public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } Sync(int permits) { setState(permits); } 如上面代码所示,Semaphore默认使用的是非公平策略,如果你需要公平策略,则可以使用带两个参数的构造函数来构造Semaphore对象,另外和CountDownLatch一样,构造函数里面传递的初始化信号量个数 permits 被赋值给了AQS 的state状态变量,也就是说这里AQS的state值表示当前持有的信号量个数。 接下来我们主要看看Semaphore实现的主要方法的源码,如下: 1.void acquire() 当前线程调用该方法的时候,目的是希望获取一个信号量资源,如果当前信号量计数个数大于 0 ,并且当前线程获取到了一个信号量则该方法直接返回,当前信号量的计数会减少 1 。否则会被放入AQS的阻塞队列,当前线程被挂起,直到其他线程调用了release方法释放了信号量,并且当前线程通过竞争获取到了改信号量。当前线程被其他线程调用了 interrupte()方法中断后,当前线程会抛出 InterruptedException异常返回。源码如下: public void acquire() throws InterruptedException { //传递参数为1,说明要获取1个信号量资源 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //(1)如果线程被中断,则抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //(2)否者调用sync子类方法尝试获取,这里根据构造函数确定使用公平策略 if (tryAcquireShared(arg) < 0) //如果获取失败则放入阻塞队列,然后再次尝试如果失败则调用park方法挂起当前线程 doAcquireSharedInterruptibly(arg); } 如上代码可知,acquire()内部调用了sync的acquireSharedInterruptibly 方法,后者是对中断响应的(如果当前线程被中断,则抛出中断异常),尝试获取信号量资源的AQS的方法tryAcquireShared 是由 sync 的子类实现,所以这里就要分公平性了,这里先讨论非公平策略 NonfairSync 类的tryAcquireShared 方法,源码如下: protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取当前信号量值 int available = getState(); //计算当前剩余值 int remaining = available - acquires; //如果当前剩余小于0或者CAS设置成功则返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 如上代码,先计算当前信号量值(available)减去需要获取的值(acquires) 得到剩余的信号量个数(remaining),如果剩余值小于 0 说明当前信号量个数满足不了需求,则直接返回负数,然后当前线程会被放入AQS的阻塞队列,当前线程被挂起。如果剩余值大于 0 则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。另外可以知道NonFairSync是非公平性获取的,是说先调用aquire方法获取信号量的线程不一定比后来者先获取锁。 接下来我们要看看公平性的FairSync 类是如何保证公平性的,源码如下: protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } 可以知道公平性还是靠 hasQueuedPredecessors 这个方法来做的,以前的随笔已经讲过公平性是看当前线程节点是否有前驱节点也在等待获取该资源,如果是则自己放弃获取的权力,然后当前线程会被放入AQS阻塞队列,否则就去获取。hasQueuedPredecessors源码如下: public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 如上面代码所示,如果当前线程节点有前驱节点则返回true,否则如果当前AQS队列为空 或者 当前线程节点是AQS的第一个节点则返回 false ,其中,如果 h == t 则说明当前队列为空则直接返回 false,如果 h !=t 并且 s == null 说明有一个元素将要作为AQS的第一个节点入队列(回顾下 enq 函数第一个元素入队列是两步操作,首先创建一个哨兵头节点,然后第一个元素插入到哨兵节点后面),那么返回 true,如果 h !=t 并且 s != null 并且s.thread != Thread.currentThread() 则说明队列里面的第一个元素不是当前线程则返回 true。 2.void acquire(int permits) 该方法与 acquire() 不同在与后者只需要获取一个信号量值,而前者则获取指定 permits 个,源码如下: public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } 3.void acquireUninterruptibly() 该方法与 acquire() 类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源过程中(包含被阻塞后)其它线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志当前线程并不会抛出 InterruptedException 异常而返回。源码如下: public void acquireUninterruptibly() { sync.acquireShared(1); } 4.void acquireUninterruptibly(int permits) 该方法与 acquire(int permits) 不同在于该方法对中断不响应。源码如如下: public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } 5.void release() 该方法作用是把当前 semaphore对象的信号量值增加 1 ,如果当前有线程因为调用 acquire 方法被阻塞放入了 AQS的阻塞队列,则会根据公平策略选择一个线程进行激活,激活的线程会尝试获取刚增加的信号量,源码如下: public void release() { //(1)arg=1 sync.releaseShared(1); } public final boolean releaseShared(int arg) { //(2)尝试释放资源 if (tryReleaseShared(arg)) { //(3)资源释放成功则调用park唤醒AQS队列里面最先挂起的线程 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { //(4)获取当前信号量值 int current = getState(); //(5)当前信号量值增加releases,这里为增加1 int next = current + releases; if (next < current) // 移除处理 throw new Error("Maximum permit count exceeded"); //(6)使用cas保证更新信号量值的原子性 if (compareAndSetState(current, next)) return true; } } 如上面代码可以看到 release()方法中对 sync.releaseShared(1),可以知道release方法每次只会对信号量值增加 1 ,tryReleaseShared方法是无限循环,使用CAS保证了 release 方法对信号量递增 1 的原子性操作。当tryReleaseShared 方法增加信号量成功后会执行代码(3),调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。 6.void release(int permits) 该方法与不带参数的不同之处在于前者每次调用会在信号量值原来基础上增加 permits,而后者每次增加 1。源码如下: public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } 另外注意到这里调用的是 sync.releaseShared 是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会阻塞。 到目前已经知道了其原理,接下来用一个例子来加深对Semaphore的理解,例子如下: package com.hjc; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * Created by cong on 2018/7/8. */ public class SemaphoreTest { // 创建一个Semaphore实例 private static volatile Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 加入线程A到线程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " over"); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }); // 加入线程B到线程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " over"); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }); // 等待子线程执行完毕,返回 semaphore.acquire(2); System.out.println("all child thread over!"); //关闭线程池 executorService.shutdown(); } } 运行结果如下: 类似于 CountDownLatch,上面我们的例子也是在主线程中开启两个子线程进行执行,等所有子线程执行完毕后主线程在继续向下运行。 如上代码首先首先创建了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器为 0,然后 main 函数添加两个线程任务到线程池,每个线程内部调用了信号量的 release 方法,相当于计数值递增一,最后在 main 线程里面调用信号量的 acquire 方法,参数传递为 2 说明调用 acquire 方法的线程会一直阻塞,直到信号量的计数变为 2 时才会返回。 看到这里也就明白了,如果构造 Semaphore 时候传递的参数为 N,在 M 个线程中调用了该信号量的 release 方法,那么在调用 acquire 对 M 个线程进行同步时候传递的参数应该是 M+N; 对CountDownLatch,CyclicBarrier,Semaphored这三者之间的比较总结: 1.CountDownLatch 通过计数器提供了更灵活的控制,只要检测到计数器为 0,而不管当前线程是否结束调用 await 的线程就可以往下执行,相比使用 jion 必须等待线程执行完毕后主线程才会继续向下运行更灵活。 2.CyclicBarrier 也可以达到 CountDownLatch 的效果,但是后者当计数器变为 0 后,就不能在被复用,而前者则使用 reset 方法可以重置后复用,前者对同一个算法但是输入参数不同的类似场景下比较适用。 3.而 semaphore 采用了信号量递增的策略,一开始并不需要关心需要同步的线程个数,等调用 aquire 时候在指定需要同步个数,并且提供了获取信号量的公平性策略。

优秀的个人博客,低调大师

Java并发机制底层实现原理-原子操作的实现原理

章节目录 原子操作含义 相关术语 保证多处理器操作原子性的两种方式 Java语言层面上实现原子操作 原子操作的含义: 原子本意是"不能被进一步分割的最小粒子",而原子操作意为,不可中断的一个或一系列操作。 相关术语 术语名称 英文 解释 缓存行 Cache line 缓存的最小操作单位 比较并交换 Compare and Swap CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较旧值有没有发生变化,如果没有发生变化,才交换成新值,发生变化,表示有多线程竞争,则不交换 保证多处理器操作原子性的两种方式 通过总线锁保证操作共享变量是原子性的 如果多个处理器同事对共享变量进行读改写操作(i++是经典的读改写操作),那 么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子性的,操作完之后,共享变量的值会和期望的不一致。 如: public class IncreaceThread implements Runnable { public int i = 1; public void run() { this.i = ++i; } public int getI(){ return this.i; } public static void main(String[] args) { IncreaceThread increaseThread = new IncreaceThread(); Thread thread1 = new Thread(increaseThread); Thread thread2 = new Thread(increaseThread); thread1.start(); thread2.start(); System.out.println(increaseThread.getI()); } } 计算出最终i的值有可能是2,而不是3。 原因可能是多个处理器同时从各自的缓存中读取变量i,分别进行+1操作,然后分别写入到系统内从中。 想要保证改写共享变量的操作是原子的,那就必须保证CPU1读改写共享变量的 时候,CPU2不能操作缓存了该共享变量内存地址的缓存。 处理器使用总线锁来解决这个问题,所谓总线锁,就是处理器提供一个LOCK#信号,当一个处理器在总线上输出此信号时, 其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。 使用缓存锁保证原子性 第二个机制是通过缓存锁定来保证原子性。 同一时刻,我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU 和内存之间的通信锁住,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁的开销比较大。 使用缓存锁定开销会变小,缓存锁定,是指内存区域如果被缓存在处理器缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性来保证操作的原子性。 Java语言层面上实现原子操作 在Java中通过锁和循环CAS的方式实现原子操作 使用循环CAS实现原子操作 JVM中的CAS操作正式利用了处理器提供的cmpxchg指令实现的,自旋基本思路就是循环进行CAS操作知道成功为止,以下代码实现可一个基于CAS线程安全的计数器方法safeCount 和一个非线程安全的计数器count。 package com.imooc.item; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger atomicInteger = new AtomicInteger(0); private int i = 0; public void safeCount() { for (; ; ) { int i = atomicInteger.get(); boolean suc = atomicInteger.compareAndSet(i, ++i); if (suc) { break; } } } //线程计数器 public void count() { i++; } public static void main(String[] args) { final Counter cas = new Counter(); List<Thread> ts = new ArrayList<Thread>(600); long start = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 10000; i++) { cas.count(); cas.safeCount(); } } }); ts.add(t); } //开始运行线程 for (Thread t : ts) { t.start(); } //等待所有线程执行完毕 for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(cas.i); System.out.println(cas.atomicInteger.get()); System.out.println(System.currentTimeMillis() - start); } } 运行结果如下所示: 996309 1000000 61 CAS实现原子操作的三大问题 ABA 问题 因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则 更新,但如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会 发现它的值没有发生变化,但实际上却变了,ABA问题的解决思路就是使用版 本号,在变量前面加上版本号,每次变量更新的时候把版本号加1。 那么A->B->A问题就会变成1A-2B-3A. 循环时间开销大 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。常用做法是控制自旋的次数。 只能保证一个共享变量的原子操作 当对一个共享变量执行操作时,我们可以使用循环CAS的方式保证原子操作, 但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候可 以用锁。 使用锁机制实现原子操作 锁机制保证了只有获得锁的线程才能够 操作锁定的线程共享区域(临界区), JVM内部实现了很多的锁机制:偏向锁、轻量级锁和互斥锁。其中,除了偏向 锁,JVM实现锁的方式都使用了循环CAS,即当一个线程想进入同步块的时候使 用循环CAS的方式获取锁,当它退出同步块的时候使用循环CAS释放锁。

优秀的个人博客,低调大师

Java并发编程实战系列10之避免活跃性危险

10.1 死锁 哲学家问题 有环 A等B,B等A 数据库往往可以检测和解决死锁//TODO JVM不行,一旦死锁只有停止重启。 下面分别介绍了几种典型的死锁情况: 10.1.1 Lock ordering Deadlocks 下面是一个经典的锁顺序死锁:两个线程用不同的顺序来获得相同的锁,如果按照锁的请求顺序来请求锁,就不会发生这种循环依赖的情况。 public class LeftRightDeadlock { private final Object left = new Object(); private final Object right = new Object(); public void leftRight() { synchronized (left) { synchronized (right) { doSomething(); } } } public void rightLeft() { synchronized (right) { synchronized (left) { doSomethingElse(); } } } void doSomething() { } void doSomethingElse() { } } 10.1.1 Dynamic Lock Order Deadlocks 下面的转账例子,如果一个线程X向Y转,而另外一个线程Y向X也转,那么就会发生死锁。 public class DynamicOrderDeadlock { // Warning: deadlock-prone! public static void transferMoney(Account fromAccount, Account toAccount, DollarAmount amount) throws InsufficientFundsException { synchronized (fromAccount) { synchronized (toAccount) { if (fromAccount.getBalance().compareTo(amount) < 0) throw new InsufficientFundsException(); else { fromAccount.debit(amount); toAccount.credit(amount); } } } } static class DollarAmount implements Comparable<DollarAmount> { // Needs implementation public DollarAmount(int amount) { } public DollarAmount add(DollarAmount d) { return null; } public DollarAmount subtract(DollarAmount d) { return null; } public int compareTo(DollarAmount dollarAmount) { return 0; } } static class Account { private DollarAmount balance; private final int acctNo; private static final AtomicInteger sequence = new AtomicInteger(); public Account() { acctNo = sequence.incrementAndGet(); } void debit(DollarAmount d) { balance = balance.subtract(d); } void credit(DollarAmount d) { balance = balance.add(d); } DollarAmount getBalance() { return balance; } int getAcctNo() { return acctNo; } } static class InsufficientFundsException extends Exception { } } 解决办法还是顺序话锁,考虑针对两种情况取hashcode然后判断if-else里面决定锁顺序。 class Helper { public void transfer() throws InsufficientFundsException { if (fromAcct.getBalance().compareTo(amount) < 0) throw new InsufficientFundsException(); else { fromAcct.debit(amount); toAcct.credit(amount); } } } int fromHash = System.identityHashCode(fromAcct); int toHash = System.identityHashCode(toAcct); if (fromHash < toHash) { synchronized (fromAcct) { synchronized (toAcct) { new Helper().transfer(); } } } else if (fromHash > toHash) { synchronized (toAcct) { synchronized (fromAcct) { new Helper().transfer(); } } } else { synchronized (tieLock) { synchronized (fromAcct) { synchronized (toAcct) { new Helper().transfer(); } } } } 10.1.3 在协作对象之间发生死锁Deadlocks Between Cooperating Objects 下面的例子setLocation和getImage都会获取两把锁,会存在两个线程按照不同的顺序获取锁的情况。 public class CooperatingDeadlock { // Warning: deadlock-prone! class Taxi { @GuardedBy("this") private Point location, destination; private final Dispatcher dispatcher; public Taxi(Dispatcher dispatcher) { this.dispatcher = dispatcher; } public synchronized Point getLocation() { return location; } public synchronized void setLocation(Point location) { this.location = location; if (location.equals(destination)) dispatcher.notifyAvailable(this); } public synchronized Point getDestination() { return destination; } public synchronized void setDestination(Point destination) { this.destination = destination; } } class Dispatcher { @GuardedBy("this") private final Set<Taxi> taxis; @GuardedBy("this") private final Set<Taxi> availableTaxis; public Dispatcher() { taxis = new HashSet<Taxi>(); availableTaxis = new HashSet<Taxi>(); } public synchronized void notifyAvailable(Taxi taxi) { availableTaxis.add(taxi); } public synchronized Image getImage() { Image image = new Image(); for (Taxi t : taxis) image.drawMarker(t.getLocation()); return image; } } class Image { public void drawMarker(Point p) { } } } 10.1.4 开放调用 减小锁的力度,锁不嵌套。 class CooperatingNoDeadlock { @ThreadSafe class Taxi { @GuardedBy("this") private Point location, destination; private final Dispatcher dispatcher; public Taxi(Dispatcher dispatcher) { this.dispatcher = dispatcher; } public synchronized Point getLocation() { return location; } public synchronized void setLocation(Point location) { boolean reachedDestination; synchronized (this) { this.location = location; reachedDestination = location.equals(destination); } if (reachedDestination) dispatcher.notifyAvailable(this); } public synchronized Point getDestination() { return destination; } public synchronized void setDestination(Point destination) { this.destination = destination; } } @ThreadSafe class Dispatcher { @GuardedBy("this") private final Set<Taxi> taxis; @GuardedBy("this") private final Set<Taxi> availableTaxis; public Dispatcher() { taxis = new HashSet<Taxi>(); availableTaxis = new HashSet<Taxi>(); } public synchronized void notifyAvailable(Taxi taxi) { availableTaxis.add(taxi); } public Image getImage() { Set<Taxi> copy; synchronized (this) { copy = new HashSet<Taxi>(taxis); } Image image = new Image(); for (Taxi t : copy) image.drawMarker(t.getLocation()); return image; } } class Image { public void drawMarker(Point p) { } } } 1.0.15 资源死锁 数据库连接池,A持有数据库D1连接,等待与D2连接,B持有D2的连接,等待与D1连接。 线程饥饿死锁,如8.1.1小节的例子。 10.2 死锁的避免与诊断 10.2.1 支持定时的锁 tryLock 10.2.2 kill -3 发信号给JVM dump线程 10.3 其他活跃性危险 10.3.1 饥饿 10.3.3 活锁Livelock 他不会阻塞线程,但是也不能继续执行,因为线程在不断的重复执行相同的操作,而且总会失败。 例如处理事务消,回滚后再次重新把任务放在队头。 又例如发送数据包,都选择1s后重试,那么总会冲突,所以可以考虑一个随机数时间间隔。

优秀的个人博客,低调大师

Java并发编程实战系列8之线程池的使用

ThreadPoolExecutor UML图: image image 8.1 在任务和执行策略之间隐形耦合 避免Thread starvation deadlock 8.2 设置线程池大小 8.3 配置ThreadPoolExecutor image 构造函数如下: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } 核心和最大池大小:如果运行的线程少于 corePoolSize,则创建新线程来处理请求(即一个Runnable实例),即使其它线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。 保持活动时间:如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。 排队:如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列BlockingQueue,而不添加新的线程。 被拒绝的任务:当 Executor 已经关闭,或者队列已满且线程数量达到maximumPoolSize时(即线程池饱和了),请求将被拒绝。这些拒绝的策略叫做Saturation Policy,即饱和策略。包括AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy. 另外注意: 如果运行的线程少于 corePoolSize,ThreadPoolExecutor 会始终首选创建新的线程来处理请求;注意,这时即使有空闲线程也不会重复使用(这和数据库连接池有很大差别)。 如果运行的线程等于或多于 corePoolSize,则 ThreadPoolExecutor 会将请求加入队列BlockingQueue,而不添加新的线程(这和数据库连接池也不一样)。 如果无法将请求加入队列(比如队列已满),则创建新的线程来处理请求;但是如果创建的线程数超出 maximumPoolSize,在这种情况下,请求将被拒绝。 newCachedThreadPool使用了SynchronousQueue,并且是无界的。 线程工厂ThreadFactory 8.4 扩展ThreadPoolExecutor 重写beforeExecute和afterExecute方法。 8.5 递归算法的并行化 实际就是类似Number of Islands或者N-Queens等DFS问题的一种并行处理。 串行版本如下: public class SequentialPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final Set<P> seen = new HashSet<P>(); public SequentialPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; } public List<M> solve() { P pos = puzzle.initialPosition(); return search(new PuzzleNode<P, M>(pos, null, null)); } private List<M> search(PuzzleNode<P, M> node) { if (!seen.contains(node.pos)) { seen.add(node.pos); if (puzzle.isGoal(node.pos)) return node.asMoveList(); for (M move : puzzle.legalMoves(node.pos)) { P pos = puzzle.move(node.pos, move); PuzzleNode<P, M> child = new PuzzleNode<P, M>(pos, move, node); List<M> result = search(child); if (result != null) return result; } } return null; } } 并行版本如下: public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // block until solution found PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown(); } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { if (solution.isSet() || seen.putIfAbsent(pos, true) != null) return; // already solved or seen this position if (puzzle.isGoal(pos)) solution.setValue(this); else for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } }

优秀的个人博客,低调大师

监控服务器ssh登录,并发送报警邮件

最近想监控下云主机的ssh登录情况,所以开始写ssh登录报警监控。实现方式并不难。 一:邮箱申请开启SMTP 在邮箱中选择“设置”----->“账户” 在如下图处开启POP3/SMTP服务,并生成授权码。 二:修改相关参数 登录要进行ssh登录监控的服务器,在/etc/ssh创建"sshrc"文件: #!/bin/bash #获取登录者的用户名 user=$USER #获取登录者的IP地址 ip=${SSH_CLIENT%% *} #获取登录的时间 time=$(date +%F%t%k:%M) #服务器的IP地址 hostname=$(hostname) echo "content=$time,$user,$ip,$hostname" > log python /etc/ssh/testEmail.py "$time" "$user" "$ip" "$hostname" 并在如上自定义路径中创建“testEmail.py”文件。 添加颜色部分,按邮箱,授权码,以及相关术语进行修改。 #!/usr/bin/python # -*- coding: UTF-8 -*- import smtplib from email import encoders from email.header import Header from email.mime.text import MIMEText from email.utils import parseaddr, formataddr import sys def send_mail(dtime,duser,dip,dhostname): #基础信息 # from_addr = input("From:") from_addr = "yaohong@qq.com" password = "授权码" #to_addr = from_addr to_addr = "yaohong@qq.com" # password = raw_input("Password:") # to_addr = input("To:") def _format_addr(s): name, addr = parseaddr(s) return formataddr((Header(name, 'utf-8').encode(), addr)) smtp_server = "smtp.qq.com" mimetex = '您的机器:',dhostname,',于:',dtime,',被IP:',dip,'以账号',duser,'进行登录,请确认是否为公司员工。' #构造邮件 msg = MIMEText(''.join(mimetex), 'plain', 'utf-8') msg['From'] = _format_addr("yaohong") msg['To'] = _format_addr("yaohong@qq.com") msg['Subject'] = Header("来自yaohong", 'utf-8').encode() #发送邮件 server = smtplib.SMTP_SSL(smtp_server, 465) server.set_debuglevel(1) server.login(from_addr, password) server.sendmail(from_addr, [to_addr], msg.as_string()) server.quit() if __name__ == "__main__": send_mail(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]) 三:登录检测 安装完后,再次进行ssh登录该服务器会收到邮件如下图,则表示ssh被监控成功。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册