首页 文章 精选 留言 我的

精选列表

搜索[高并发],共10000篇文章
优秀的个人博客,低调大师

python并发模块之concurrent.futures(一)

Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持,他属于上层的封装,对于用户来说,不用在考虑那么多东西了。 官方参考资料:https://pythonhosted.org/futures/ 1.Executor Exectuor是基础模块,这是一个抽象类,其子类分为ThreadPoolExecutor和ProcessPoolExecutor,分别被用来创建线程池和进程池。 提供的方法如下: Executor.submit(fn, *args, **kwargs) fn:为需要异步执行的函数 args,kwargs:为给函数传递的参数 就来看看官网的这个例子: withThreadPoolExecutor(max_workers=1)asexecutor: future=executor.submit(pow,323,1235) print(future.result()) 我们使用submit方法来往线程池中加入一个task(pow函数),submit返回一个Future对象。其中future.result()的result方法的作用是拿到调用返回的结果。如果没有执行完毕就会去等待。这里我们使用with操作符,使得当任务执行完成之后,自动执行shutdown函数,而无需编写相关释放代码。 关于更多future的具体方法说明看后面的future部分解释。 Executor.map(fn, *args, **kwargs) map(func, *iterables, timeout=None) 此map函数和python自带的map函数功能类似,只不过concurrent模块的map函数从迭代器获得参数后异步执行。并且,每一个异步操作,能用timeout参数来设置超时时间,timeout的值可以是int或float型,如果操作timeout的话,会raisesTimeoutError。如果timeout参数不指定的话,则不设置超时间。 func:为需要异步执行的函数 iterables:可以是一个能迭代的对象. timeout:设置每次异步操作的超时时间 fromconcurrent.futuresimportThreadPoolExecutorimportrequests URLS=['http://www.163.com','https://www.baidu.com/','https://github.com/']defload_url(url): req=requests.get(url,timeout=60) print('%rpageis%dbytes'%(url,len(req.content))) executor=ThreadPoolExecutor(max_workers=3) executor.map(load_url,URLS) print('主线程结束') submit函数和map函数,根据需要,选一个使用即可。 Executor.shutdown(wait=True) 此函数用于释放异步执行操作后的系统资源。Executor实现了enter__和__exit使得其对象可以使用with操作符。 在这里可以使用with上下文关键字代替,如上面第一个submit的例子。 2.Future对象 submit函数返回future对象,future提供了跟踪任务执行状态的方法,Future实例可以被Executor.submit()方法创建。除了测试之外不应该直接创建。 cancel():尝试去取消调用。如果调用当前正在执行,不能被取消。这个方法将返回False,否则调用将会被取消,方法将返回True cancelled():如果调用被成功取消返回True running():如果当前正在被执行不能被取消返回True done():如果调用被成功取消或者完成running返回True result(Timeout = None):拿到调用返回的结果。如果没有执行完毕就会去等待 exception(timeout=None):捕获程序执行过程中的异常 add_done_callback(fn):将fn绑定到future对象上。当future对象被取消或完成运行时,fn函数将会被调用 3.wait方法 wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED。 如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程: #!/usr/bin/envpython#encoding:utf-8fromconcurrent.futuresimportThreadPoolExecutor,wait,as_completedimportrequests URLS=['http://www.163.com','https://www.baidu.com/','https://github.com/']defload_url(url): req=requests.get(url,timeout=60) print('%rpageis%dbytes'%(url,len(req.content))) executor=ThreadPoolExecutor(max_workers=3) f_list=[]forurlinURLS: future=executor.submit(load_url,url) f_list.append(future) print(wait(f_list)) print('主线程结束') 如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成。 fromconcurrent.futuresimportThreadPoolExecutor,wait,as_completedimportrequests URLS=['http://www.163.com','https://www.baidu.com/','https://github.com/']defload_url(url): req=requests.get(url,timeout=60) print('%rpageis%dbytes'%(url,len(req.content))) executor=ThreadPoolExecutor(max_workers=3) f_list=[]forurlinURLS: future=executor.submit(load_url,url) f_list.append(future) print(wait(f_list,return_when='FIRST_COMPLETED')) print('主线程结束') 关于模块的基本使用就是上面的这些。后续会做一些拓展或者案例。

优秀的个人博客,低调大师

Java并发编程之线程安全、线程通信

Java多线程开发中最重要的一点就是线程安全的实现了。所谓Java线程安全,可以简单理解为当多个线程访问同一个共享资源时产生的数据不一致问题。为此,Java提供了一系列方法来解决线程安全问题。 synchronized synchronized用于同步多线程对共享资源的访问,在实现中分为同步代码块和同步方法两种。 同步代码块 1 public class DrawThread extends Thread { 2 3 private Account account; 4 private double drawAmount; 5 public DrawThread(String name, Account account, double drawAmount) { 6 super(name); 7 this.account = account; 8 this.drawAmount = drawAmount; 9 } 10 @Override 11 public void run() { 12 //使用account作为同步代码块的锁对象 13 synchronized(account) { 14 if (account.getBalance() >= drawAmount) { 15 System.out.println(getName() + "取款成功, 取出:" + drawAmount); 16 try { 17 TimeUnit.MILLISECONDS.sleep(1); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 account.setBalance(account.getBalance() - drawAmount); 22 System.out.println("余额为: " + account.getBalance()); 23 } else { 24 System.out.println(getName() + "取款失败!余额不足!"); 25 } 26 } 27 } 28 } 同步方法 使用同步方法,即使用synchronized关键字修饰类的实例方法或类方法,可以实现线程安全类,即该类在多线程访问中,可以保证可变成员的数据一致性。 同步方法中,隐式的锁对象由锁的是实例方法还是类方法确定,分别为该类对象或类的Class对象。 1 public class SyncAccount { 2 private String accountNo; 3 private double balance; 4 //省略构造器、getter setter方法 5 //在一个简单的账户取款例子中, 通过添加synchronized的draw方法, 把Account类变为一个线程安全类 6 public synchronized void draw(double drawAmount) { 7 if (balance >= drawAmount) { 8 System.out.println(Thread.currentThread().getName() + "取款成功, 取出:" + drawAmount); 9 try { 10 TimeUnit.MILLISECONDS.sleep(1); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 balance -= drawAmount; 15 System.out.println("余额为: " + balance); 16 } else { 17 System.out.println(Thread.currentThread().getName() + "取款失败!余额不足!"); 18 } 19 } 20 //省略HashCode和equals方法 21 } 同步锁(Lock、ReentrantLock) Java5新增了两个用于线程同步的接口Lock和ReadWriteLock,并且分别提供了两个实现类ReentrantLock(可重入锁)和ReentrantReadWriteLock(可重入读写锁)。 相比较synchronized,ReentrantLock的一些优势功能: 1. 等待可中断:指持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待。 2. 公平锁:多个线程等待同一个锁时,必须按照申请锁的时间顺序依次获取。synchronized是非公平锁,ReentrantLock可以通过参数设置为公平锁 3. 多条件锁:ReentrantLock可通过Condition类获取多个条件关联 Java 1.6以后,synchronized性能提升较大,因此一般的开发中依然建议使用语法层面上的synchronized加锁。 Java8新增了更为强大的可重入读写锁StampedLock类。 比较常用的是ReentrantLock类,可以显示地加锁、释放锁。下面使用ReentrantLock重构上面的SyncAccount类。 1 public class RLAccount { 2 //定义锁对象 3 private final ReentrantLock lock = new ReentrantLock(); 4 private String accountNo; 5 private double balance; 6 //省略构造方法和getter setter 7 public void draw(double drawAmount) { 8 //加锁 9 lock.lock(); 10 try { 11 if (balance >= drawAmount) { 12 System.out.println(Thread.currentThread().getName() + "取款成功, 取出:" + drawAmount); 13 try { 14 TimeUnit.MILLISECONDS.sleep(1); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 balance -= drawAmount; 19 System.out.println("余额为: " + balance); 20 } else { 21 System.out.println(Thread.currentThread().getName() + "取款失败!余额不足!"); 22 } 23 } finally { 24 //通过finally块保证释放锁 25 lock.unlock(); 26 } 27 } 28 } 死锁 当两个线程相互等待地方释放锁的时候,就会产生死锁。关于死锁和线程安全的深入分析,将另文介绍。 线程通信方式之wait、notify、notifyAll Object类提供了三个用于线程通信的方法,分别是wait、notify和notifyAll。这三个方法必须由同步锁对象来调用,具体来说: 1. 同步方法:因为同步方法默认使用所在类的实例作为锁,即this,可以在方法中直接调用。 2. 同步代码块:必须由锁来调用。 wait():导致当前线程等待,直到其它线程调用锁的notify方法或notifyAll方法来唤醒该线程。调用wait的线程会释放锁。 notify():唤醒任意一个在等待的线程 notifyAll():唤醒所有在等待的线程 1 /* 2 * 通过一个生产者-消费者队列来说明线程通信的基本使用方法 3 * 注意: 假如这里的判断条件为if语句,唤醒方法为notify, 那么如果分别有多个线程操作入队\出队, 会导致线程不安全. 4 */ 5 public class EventQueue { 6 7 private final int max; 8 9 static class Event{ 10 11 } 12 //定义一个不可改的链表集合, 作为队列载体 13 private final LinkedList<Event> eventQueue = new LinkedList<>(); 14 15 private final static int DEFAULT_MAX_EVENT = 10; 16 17 public EventQueue(int max) { 18 this.max = max; 19 } 20 21 public EventQueue() { 22 this(DEFAULT_MAX_EVENT); 23 } 24 25 private void console(String message) { 26 System.out.printf("%s:%s\n",Thread.currentThread().getName(), message); 27 } 28 //定义入队方法 29 public void offer(Event event) { 30 //使用链表对象作为锁 31 synchronized(eventQueue) { 32 //在循环中判断如果队列已满, 则调用锁的wait方法, 使线程阻塞 33 while(eventQueue.size() >= max) { 34 try { 35 console(" the queue is full"); 36 eventQueue.wait(); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 } 41 console(" the new event is submitted"); 42 eventQueue.addLast(event); 43 this.eventQueue.notifyAll(); 44 } 45 } 46 //定义出队方法 47 public Event take() { 48 //使用链表对象作为锁 49 synchronized(eventQueue) { 50 //在循环中判断如果队列已空, 则调用锁的wait方法, 使线程阻塞 51 while(eventQueue.isEmpty()) { 52 try { 53 console(" the queue is empty."); 54 eventQueue.wait(); 55 } catch (InterruptedException e) { 56 e.printStackTrace(); 57 } 58 } 59 Event event = eventQueue.removeFirst(); 60 this.eventQueue.notifyAll(); 61 console(" the event " + event + " is handled/taked."); 62 return event; 63 } 64 } 65 } 线程通信方式之Condition 如果使用的是Lock接口实现类来同步线程,就需要使用Condition类的三个方法实现通信,分别是await、signal和signalAll,使用上与Object类的通信方法基本一致。 1 /* 2 * 使用Lock接口和Condition来实现生产者-消费者队列的通信 3 */ 4 public class ConditionEventQueue { 5 //显示定义Lock对象 6 private final Lock lock = new ReentrantLock(); 7 //通过newCondition方法获取指定Lock对象的Condition实例 8 private final Condition cond = lock.newCondition(); 9 private final int max; 10 static class Event{ } 11 //定义一个不可改的链表集合, 作为队列载体 12 private final LinkedList<Event> eventQueue = new LinkedList<>(); 13 private final static int DEFAULT_MAX_EVENT = 10; 14 public ConditionEventQueue(int max) { 15 this.max = max; 16 } 17 18 public ConditionEventQueue() { 19 this(DEFAULT_MAX_EVENT); 20 } 21 22 private void console(String message) { 23 System.out.printf("%s:%s\n",Thread.currentThread().getName(), message); 24 } 25 //定义入队方法 26 public void offer(Event event) { 27 lock.lock(); 28 try { 29 //在循环中判断如果队列已满, 则调用cond的wait方法, 使线程阻塞 30 while (eventQueue.size() >= max) { 31 try { 32 console(" the queue is full"); 33 cond.await(); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 console(" the new event is submitted"); 39 eventQueue.addLast(event); 40 cond.signalAll();; 41 } finally { 42 lock.unlock(); 43 } 44 45 } 46 //定义出队方法 47 public Event take() { 48 lock.lock(); 49 try { 50 //在循环中判断如果队列已空, 则调用cond的wait方法, 使线程阻塞 51 while (eventQueue.isEmpty()) { 52 try { 53 console(" the queue is empty."); 54 cond.wait(); 55 } catch (InterruptedException e) { 56 e.printStackTrace(); 57 } 58 } 59 Event event = eventQueue.removeFirst(); 60 cond.signalAll(); 61 console(" the event " + event + " is handled/taked."); 62 return event; 63 } finally { 64 lock.unlock(); 65 } 66 } 67 } Java 1.5开始就提供了BlockingQueue接口,来实现如上所述的生产者-消费者线程同步工具。具体介绍将另文说明。

优秀的个人博客,低调大师

(十五)Java并发性和多线程-死锁

死锁是两个或更多线程阻塞着等待其它处于死锁状态的线程所持有的锁。死锁通常发生在多个线程同时但以不同的顺序请求同一组锁的时候。 例如,如果线程1锁住了A,然后尝试对B进行加锁,同时线程2已经锁住了B,接着尝试对A进行加锁,这时死锁就发生了。线程1永远得不到B,线程2也永远得不到A,并且它们永远也不会知道发生了这样的事情。为了得到彼此的对象(A和B),它们将永远阻塞下去。这种情况就是一个死锁。 该情况如下: Thread 1 locks A, waits for B Thread 2 locks B, waits for A 一个简单的死锁类 当DeadLock类的对象flag==1时(td1),先锁定o1,睡眠500毫秒 而td1在睡眠的时候另一个flag==0的对象(td2)线程启动,先锁定o2,睡眠500毫秒 td1睡眠结束后需要锁定o2才能继续执行,而此时o2已被td2锁定; td2睡眠结束后需要锁定o1才能继续执行,而此时o1已被td1锁定; td1、td2相互等待,都需要得到对方锁定的资源才能继续执行,从而死锁。 import lombok.extern.slf4j.Slf4j; @Slf4j public class DeadLock implements Runnable { public int flag = 1; //静态对象是类的所有对象共享的 private static Object o1 = new Object(), o2 = new Object(); @Override public void run() { log.info("flag:{}", flag); if (flag == 1) { synchronized (o1) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } synchronized (o2) { log.info("1"); } } } if (flag == 0) { synchronized (o2) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } synchronized (o1) { log.info("0"); } } } } public static void main(String[] args) { DeadLock td1 = new DeadLock(); DeadLock td2 = new DeadLock(); td1.flag = 1; td2.flag = 0; //td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。 //td2的run()可能在td1的run()之前运行 new Thread(td1).start(); new Thread(td2).start(); } }

优秀的个人博客,低调大师

GO------小白之并发聊天室

因为没有写客户端、可以在cmd中利用 nc -u 来充当客户端 广播用户上线: 1、主go程中创建socket、defer 2、循环监听客户端连接请求 3、有一个客户端链接、创建新go程处理客户数据 4、组织用户相关信息、全局变量(结构体、map、channel) 5、Hadlconn、初始化新用户结构体信息、获取客户端IP和port、初始化新用户结构体信息、 name==addr 6、创建manager管理go程、要在for循环accpet之前、实现manager、初始化在线用户map.循环读取全局的channel、如果无数据阻塞、如果有数据遍历在线用户 map、将数据写到用户的channel中 7、新用户添加到map中、key==ip+port value==结构体 8、创建WriteToClient go程专门给当前用户发送消息、遍历自带C、读数据、Conn.Write写到客户端 9、Hadlconn中。结束位置、组织用户上线信息、将用户上线信息写到全局channel--Maneger被激 活 10、Hadlconn结尾处要有for循环、不然直接终止 有问题的地方都已经注释 接下来要在基础框架上增加小功能 发送消息内容: 1、封装函数来广播用户传的消息,传到massage中 2、起匿名go程、主要是来读取客户端发送的内容、写到全局massage中 3、for 循环读取客户端发送的内容 4、写给全局massage 查询在线用户: 将buf里减去一个“\n” 判断是否为who,如果为who、遍历map ,将名字利用conn.Write写到当前客户端 修改用户名: 1、将读取到的masg进行判断是否为rename 2、提取rename| 后的字符串、存入到client.name成员中 3、更新在线用户列表 onlinemap--key-----ip+port 4、提示用户更新成功 用户退出: 1、在用户登录成功后、创建监听用户退出的channel---isQuit 2、当conn.read==0时、isQuit<-true 3、在handleConnect结尾for中、添加select监听<-isQuit 4、条件满足、将用户从在线列表移除、阻止用户下线消息、写入massage通道 超时强踢: 1、在select中添加监听定时器time.After,计时到达将用户从在线列表移除、阻止用户下线消息、写入 massage通道 2、判断用户是否活跃,创建监听活跃的channel、只要用户执行聊天改名查询任意操作,向此channel 写数据 3、在select添加用户活跃度的监听、条件满足、不作为,目的是重置上面创建的计时器 这些功能其实都是在HandleConn函数中实现的 Func HandleConn(connnet.Conn){ defer conn.Close() //初始化结构体 addr:=conn.RemoteAddr().String() client:=Client{make(chanstring),addr,addr} Olinemap[addr]=client msg:=Mkmas(client,"login") //将消息传到全局通道发送给其他人 goWriteToClient(client,conn) Massage<-msg quit:=make(chanbool) datahas:=make(chanbool) //匿名函数主要做的是客户端写入的数据 buf:=make([]byte,1024) go func() { for{ n,err:=conn.Read(buf) ifn==0{ quit<-true fmt.Println("客户端退出") return } iferr!=nil{ fmt.Println("conn.Read error",err) return } msg:=string(buf[:n-1]) ifmsg=="who"&&len(msg)==3{ conn.Write([]byte("user list:")) for_,value:=rangeOlinemap{ Massage<-value.Addr+value.Name } }else iflen(msg)>=8&&msg[:6]=="rename"{ newname:=msg[7:] client.Name=newname Olinemap[addr]=client Olinemap[addr].Name=msg[7:,go语言里不支持这样修改结构体中的数据 conn.Write([]byte("rename successfully")) }else{ info:=Mkmas(client,msg) Massage<-info } datahas<-true //注意位置、如果客户端改名、who命令都算活跃 } }() for{ select{ case<-quit: //确定退出、n==0时 close(client.C) //关闭WriteToClient函数 delete(Olinemap,addr) str:=Mkmas(client,"exit") fmt.Println(str,"exit") Massage<-str return //return当前go程返回、但是中间创建的go程不会停止。所以要将WriteToClient关闭 case<-datahas: //如果客户端活跃则不做操作,这个是为了让下面的超时退出重置 case<-time.After(time.Second*10): delete(Olinemap,addr) str:=Mkmas(client,"timeout") Massage<-str fmt.Println(str,"timeout") return } } }

优秀的个人博客,低调大师

(二)Java并发学习笔记--安全发布对象

逸出的方式 上边关于逸出的概念讲述的很是模糊,下面列举几个逸出的示例。 通过静态变量引用逸出 public static Set<Secret> knownSecrets; public void initialize() { knowsSecrets = new HashSet<Secret>(); } 上边代码示例中,调用initialize方法,发布了knowSecrets对象。当你向knowSecrets中添加一个Secret时,会同时将Secret对象发布出去,原因是可以通过遍历knowSecrets获取到Secret对象的引用,然后进行修改。 通过非静态(私有)方法 class UnsafeStates { private String[] states = new String[]{"AK", "AL"}; public String[] getStates() { return states; } } 以这种方式发布的states会出问题,任何一个调用者都能修改它的内容。数组states已经逸出了它所属的范围,这个本应该私有的数据,事实上已经变成共有的了。 this逸出 public class ThisEscape { public ThisEscape(EventSource source) { source.registerListener(new EventListener() { public void onEvent(Event e) { doSomething(e); } }); } } 在上边代码中,当我们实例化ThisEscape对象时,会调用source的registerListener方法时,便启动了一个线程,而且这个线程持有了ThisEscape对象(调用了对象的doSomething方法),但此时ThisEscape对象却没有实例化完成(还没有返回一个引用),所以我们说,此时造成了一个this引用逸出,即还没有完成的实例化ThisEscape对象的动作,却已经暴露了对象的引用,使其他线程可以访问还没有构造好的对象,可能会造成意料不到的问题。 通过上述示例,个人理解,对逸出的概念应该定义为: 一个对象,超出了它原本的作用域,而可以被其它对象进行修改,而这种修改及修改的结果是无法预测的。换句话说:一个对象发布后,它的状态应该是稳定的,修改是可被检测到的。如果在其它线程修改(或做其它操作)一个对象后导致对象的状态未知,就可以说这个对象逸出了。 总之,一个对象逸出后,不论其它线程或对象是否使用这个逸出的对象都不重要,重要的是,被误用及被误用后的未知结果的风险总是存在的。 PS 书中给出了避免this逸出的方法: public class SafeListener { private final EventListener listener; private SafeListener() { listener = new EventListener() { public void onEvent(Event e) { doSomething(e); } }; } public static SafeListener newInstance(EventSource source) { SafeListener safe = new SafeListener(); source.registerListener(safe.listener); return safe; } } 在这个构造中,我们看到的最大的一个区别就是:当构造好了SafeListener对象之后,我们才启动了监听线程,也就确保了SafeListener对象是构造完成之后在使用的SafeListener对象。 对于这样的技术,书里面也有这样的注释: 具体来说,只有当构造函数返回时,this引用才应该从线程中逸出。构造函数可以将this引用保存到某个地方,只要其他线程不会在构造函数完成之前使用它。 安全发布对象 在静态初始化函数中初始化一个对象引用 将对象的应用保存到volatile类型的域或者AtomicReferance对象中 将对象的引用保存到某个正确构造对象的final类型域中 将对象的引用保存到一个由锁保护的域中。 /** * 懒汉模式(线程不安全) * 单例实例在第一次使用时进行创建 */ @NotThreadSafe public class SingletonExample1 { // 私有构造函数 private SingletonExample1() { } // 单例对象 private static SingletonExample1 instance = null; // 静态的工厂方法 public static SingletonExample1 getInstance() { // 这里同时有两个线程进入就可能同时初始化两个对象 if (instance == null) { instance = new SingletonExample1(); } return instance; } } 懒汉模式本身是线程不安全的,如果想要实现线程安全可以通过synchronized关键字实现: /** * 懒汉模式 * 单例实例在第一次使用时进行创建 */ @ThreadSafe @NotRecommend public class SingletonExample3 { // 私有构造函数 private SingletonExample3() { } // 单例对象 private static SingletonExample3 instance = null; // 静态的工厂方法 public static synchronized SingletonExample3 getInstance() { if (instance == null) { instance = new SingletonExample3(); } return instance; } } 但此中方式不推荐使用,应该它通过同一时间内只允许一个线程来访问的方式实现线程安全,但是却带来了性能上面的开销。 我们可以通过以下方式来实现线程安全: 懒汉模式 -》 volatile + 双重同步锁单例模式 /** * 懒汉模式 -》 双重同步锁单例模式 * 单例实例在第一次使用时进行创建 */ @ThreadSafe public class SingletonExample4 { // 私有构造函数 private SingletonExample4() { } // 1、memory = allocate() 分配对象的内存空间 // 2、ctorInstance() 初始化对象 // 3、instance = memory 设置instance指向刚分配的内存 // JVM和cpu优化,发生了指令重排(多线程 ) // 1、memory = allocate() 分配对象的内存空间 // 3、instance = memory 设置instance指向刚分配的内存 // 2、ctorInstance() 初始化对象 // 单例对象 volatile + 双重检测机制 -> 禁止指令重排 private volatile static SingletonExample4 instance = null; public static SingletonExample4 getInstance() { if (instance == null) { // 双重检测机制 // B synchronized (SingletonExample4.class) { // 同步锁 if (instance == null) { instance = new SingletonExample4(); // A - 3 } } } return instance; } } /** * 饿汉模式 * 单例实例在类装载时进行创建 */ @ThreadSafe public class SingletonExample2 { // 私有构造函数 private SingletonExample2() { } // 单例对象 private static SingletonExample2 instance = new SingletonExample2(); // 静态的工厂方法 public static SingletonExample2 getInstance() { return instance; } } 饿汉模式不会有线程问题,但是在类加载时实例化对象。使用时要考虑两点: 私有构造函数在使用时没有过多的逻辑处理(销毁性能,慢) 这个对象一定会被使用(浪费资源) 在静态代码块中实例化一个对象: /** * 饿汉模式 * 单例实例在类装载时进行创建 */ @ThreadSafe public class SingletonExample6 { // 私有构造函数 private SingletonExample6() { } // 单例对象 private static SingletonExample6 instance = null; static { instance = new SingletonExample6(); } // 静态的工厂方法 public static SingletonExample6 getInstance() { return instance; } public static void main(String[] args) { System.out.println(getInstance().hashCode()); System.out.println(getInstance().hashCode()); } } 枚举模式: /** * 枚举模式:最安全 */ @ThreadSafe @Recommend public class SingletonExample7 { // 私有构造函数 private SingletonExample7() { } public static SingletonExample7 getInstance() { return Singleton.INSTANCE.getInstance(); } private enum Singleton { INSTANCE; private SingletonExample7 singleton; // JVM保证这个方法绝对只调用一次 Singleton() { singleton = new SingletonExample7(); } public SingletonExample7 getInstance() { return singleton; } } }

优秀的个人博客,低调大师

Java并发编程笔记之FutureTask源码分析

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。 类图结构如下所示: 线程池使用 FutureTask 时候需要注意的一点事,FutureTask 使用不当可能会造成调用线程一直阻塞,如何避免? 线程池使用 FutureTask 的时候如果拒绝策略设置为了DiscardPolicy和DiscardOldestPolicy并且在被拒绝的任务的 Future 对象上调用无参 get 方法那么调用线程会一直被阻塞。 下面先通过一个简单的例子来复现问题,代码如下: public class FutureTest { //(1)线程池单个线程,线程池队列元素个数为1 private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy()); public static void main(String[] args) throws Exception { //(2)添加任务one Future futureOne = executorService.submit(new Runnable() { @Override public void run() { System.out.println("start runable one"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); //(3)添加任务two Future futureTwo = executorService.submit(new Runnable() { @Override public void run() { System.out.println("start runable two"); } }); //(4)添加任务three Future futureThree=null; try { futureThree = executorService.submit(new Runnable() { @Override public void run() { System.out.println("start runable three"); } }); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕 System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕 System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three执行完毕 executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕 } 运行结果如下: 代码 (1) 创建了一个单线程并且队列元素个数为 1 的线程池,并且拒绝策略设置为了DiscardPolicy 代码(2)向线程池提交了一个任务 one,那么这个任务会使用唯一的一个线程进行执行,任务在打印start runable one后会阻塞该线程 5s. 代码(3)向线程池提交了一个任务 two,这时候会把任务 two 放入到阻塞队列 代码(4)向线程池提交任务 three,由于队列已经满了则会触发拒绝策略丢弃任务 three, 从执行结果看在任务 one 阻塞的 5s 内,主线程执行到了代码 (5) 等待任务 one 执行完毕,当任务 one 执行完毕后代码(5)返回,主线程打印出 task one null。任务 one 执行完成后线程池的唯一线程会去队列里面取出任务 two 并执行所以输出 start runable two 然后代码(6)会返回,这时候主线程输出 task two null,然后执行代码(7)等待任务 three 执行完毕,从执行结果看代码(7)会一直阻塞不会返回,至此问题产生,如果把拒绝策略修改为 DiscardOldestPolicy 也会存在有一个任务的 get 方法一直阻塞只是现在是任务 two 被阻塞。但是如果拒绝策略设置为默认的 AbortPolicy 则会正常返回,并且会输出如下结果: 要分析这个问题需要看下线程池的 submit 方法里面做了什么,submit 方法源码如下: public Future<?> submit(Runnable task) { ... //(1)装饰Runnable为Future对象 RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); //(6)返回future对象 return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public void execute(Runnable command) { ... //(2) 如果线程个数消息核心线程数则新增处理线程处理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(3)如果当前线程个数已经达到核心线程数则任务放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(4)尝试新增处理线程进行处理 else if (!addWorker(command, false)) reject(command);//(5)新增失败则调用拒绝策略 } 代码(1)装饰 Runnable 为 FutureTask 对象,然后调用线程池的 execute 方法。 代码 (2) 如果线程个数消息核心线程数则新增处理线程处理 代码(3)如果当前线程个数已经达到核心线程数则任务放入队列 代码(4)尝试新增处理线程进行处理,失败则进行代码(5),否者直接使用新线程处理 代码(5)执行具体拒绝策略,从这里也可以看出拒绝策略执行是使用的业务线程。 所以要分析上面例子中问题所在只需要看步骤(5)对被拒绝任务的影响,这里先看下拒绝策略 DiscardPolicy 的源码,如下: public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } 可知拒绝策略 rejectedExecution 方法里面什么都没做,所以代码(4)调用 submit 后会返回一个 future 对象,这里有必要在重新说 future 是有状态的,FutureTask 内部有一个state用来展示任务的状态,并且是volatile修饰的,future 的状态枚举值如下: /** Possible state transitions: * NEW -> COMPLETING -> NORMAL 正常的状态转移 * NEW -> COMPLETING -> EXCEPTIONAL 异常 * NEW -> CANCELLED 取消 * NEW -> INTERRUPTING -> INTERRUPTED 中断 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 在代码(1)的时候使用 newTaskFor 方法转换 Runnable 任务为 FutureTask,而 FutureTask 的构造函数里面设置的状态就是 New。FutureTask的构造函数源码如下: public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } 把FutureTask提交到线程池或者线程执行start时候会调用run方法,源码如下: public void run() { //如果当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程可以成功。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //当前状态为new 则调用任务的call方法执行任务 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex);//完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移 } //执行任务成功则保存结果更新状态,unpark所有等待线程。 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { //状态从new->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //状态从COMPLETING-》NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //unpark所有等待线程。 finishCompletion(); } } 所以使用 DiscardPolicy 策略提交任务后返回了一个状态值为NEW的future对象。那么我们下面就要看下当future的无参get()方法的时候,future变为什么状态才会返回,这时候就要看一下FutureTask的get方法的源码,源码如下: public V get() throws InterruptedException, ExecutionException { int s = state; //当状态值<=COMPLETING时候需要等待,否者调用report返回 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //如果被中断,则抛异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } //组建单列表 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //超时则返回 if (nanos <= 0L) { removeWaiter(q); return state; } //否者设置park超时时间 LockSupport.parkNanos(this, nanos); } else //直接挂起当前线程 LockSupport.park(this); } } private V report(int s) throws ExecutionException { Object x = outcome; //状态值为NORMAL正常返回 if (s == NORMAL) return (V)x; //状态值大于等于CANCELLED则抛异常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并没有设置该 future 的状态,后面也没有其他机会可以设置该 future 的状态,所以 future 的状态一直是 NEW,所以一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。、 在submit任务后还可以调用futuretask的cancel来取消任务: public boolean cancel(boolean mayInterruptIfRunning) { //只有任务是new的才能取消 if (state != NEW) return false; //运行时允许中断 if (mayInterruptIfRunning) { //完成new->INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //完成INTERRUPTING->INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //不允许中断则直接new->CANCELLED else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; } 那么默认的 AbortPolicy 策略为啥没问题呢? 也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并没有设置该 future 的状态,后面也没有其他机会可以设置该 future 的状态,所以 future 的状态一直是 NEW,所以一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。 所以当使用 Future 的时候,尽量使用带超时时间的 get 方法,这样即使使用了 DiscardPolicy 拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,如果非要使用不带参数的 get 方法则可以重写 DiscardPolicy 的拒绝策略在执行策略时候设置该 Future 的状态大于 COMPLETING 即可,但是查看 FutureTask 提供的方法发现只有 cancel 方法是 public 的并且可以设置 FutureTask 的状态大于 COMPLETING,重写拒绝策略具体代码可以如下: /** * Created by cong on 2018/7/13. */ public class MyRejectedExecutionHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { if (!threadPoolExecutor.isShutdown()) { if(null != runnable && runnable instanceof FutureTask){ ((FutureTask) runnable).cancel(true); } } } } 使用这个策略时候由于从 report 方法知道在 cancel 的任务上调用 get() 方法会抛出异常所以代码(7)需要使用 try-catch 捕获异常代码(7)修改为如下: package com.hjc; import java.util.concurrent.*; /** * Created by cong on 2018/7/13. */ public class FutureTest { //(1)线程池单个线程,线程池队列元素个数为1 private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler()); public static void main(String[] args) throws Exception { //(2)添加任务one Future futureOne = executorService.submit(new Runnable() { public void run() { System.out.println("start runable one"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); //(3)添加任务two Future futureTwo = executorService.submit(new Runnable() { public void run() { System.out.println("start runable two"); } }); //(4)添加任务three Future futureThree = null; try { futureThree = executorService.submit(new Runnable() { public void run() { System.out.println("start runable three"); } }); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕 System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕 try{ System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three }catch(Exception e){ System.out.println(e.getLocalizedMessage()); } executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕 } } 运行结果如下: 当然这相比正常情况下多了一个异常捕获,其实最好的情况是重写拒绝策略时候设置 FutureTask 的状态为 NORMAL,但是这需要重写 FutureTask 方法了,因为 FutureTask 并没有提供接口进行设置。

优秀的个人博客,低调大师

Java并发编程笔记之Timer源码分析

timer在JDK里面,是很早的一个API了。具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢? Timer只创建唯一的线程来执行所有Timer任务。如果一个timer任务的执行很耗时,会导致其他TimerTask的时效准确性出问题。例如一个TimerTask每10秒执行一次,而另外一个TimerTask每40ms执行一次,重复出现的任务会在后来的任务完成后快速连续的被调用4次,要么完全“丢失”4次调用。Timer的另外一个问题在于,如果TimerTask抛出未检查的异常会终止timer线程。这种情况下,Timer也不会重新回复线程的执行了;它错误的认为整个Timer都被取消了。此时已经被安排但尚未执行的TimerTask永远不会再执行了,新的任务也不能被调度了。 这里做了一个小的 demo 来复现问题,代码如下: package com.hjc; import java.util.Timer; import java.util.TimerTask; /** * Created by cong on 2018/7/12. */ public class TimerTest { //创建定时器对象 static Timer timer = new Timer(); public static void main(String[] args) { //添加任务1,延迟500ms执行 timer.schedule(new TimerTask() { @Override public void run() { System.out.println("---one Task---"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } throw new RuntimeException("error "); } }, 500); //添加任务2,延迟1000ms执行 timer.schedule(new TimerTask() { @Override public void run() { for (;;) { System.out.println("---two Task---"); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }, 1000); } } 如上代码先添加了一个任务在 500ms 后执行,然后添加了第二个任务在 1s 后执行,我们期望的是当第一个任务输出 ---one Task--- 后等待 1s 后第二个任务会输出 ---two Task---, 但是执行完毕代码后输出结果如下所示: 例子2, public class Shedule { private static long start; public static void main(String[] args) { TimerTask task = new TimerTask() { public void run() { System.out.println(System.currentTimeMillis()-start); try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } } }; TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println(System.currentTimeMillis()-start); } }; Timer timer = new Timer(); start = System.currentTimeMillis(); //启动一个调度任务,1S钟后执行 timer.schedule(task,1000); //启动一个调度任务,3S钟后执行 timer.schedule(task1,3000); } } 上面程序我们预想是第一个任务执行后,第二个任务3S后执行的,即输出一个1000,一个3000. 实际运行结果如下: 实际运行结果并不如我们所愿。世界结果,是过了4S后才输出第二个任务,即4001约等于4秒。那部分时间时间到哪里去了呢?那个时间是被我们第一个任务的sleep所占用了。 现在我们在第一个任务中去掉Thread.sleep();这一行代码,运行是否正确了呢?运行结果如下: 可以看到确实是第一个任务过了1S后执行,第二个任务在第一个任务执行完后过3S执行了。 这就说明了Timer只创建唯一的线程来执行所有Timer任务。如果一个timer任务的执行很耗时,会导致其他TimerTask的时效准确性出问题。 Timer 实现原理分析 下面简单介绍下 Timer 的原理,如下图是 Timer 的原理模型介绍: 1.其中 TaskQueue 是一个平衡二叉树堆实现的优先级队列,每个 Timer 对象内部有唯一一个 TaskQueue 队列。用户线程调用 timer 的 schedule 方法就是把 TimerTask 任务添加到 TaskQueue 队列,在调用 schedule 的方法时候 long delay 参数用来说明该任务延迟多少时间执行。 2.TimerThread 是具体执行任务的线程,它从 TaskQueue 队列里面获取优先级最小的任务进行执行,需要注意的是只有执行完了当前的任务才会从队列里面获取下一个任务而不管队列里面是否有已经到了设置的 delay 时间,一个 Timer 只有一个 TimerThread 线程,所以可知 Timer 的内部实现是一个多生产者单消费者模型。 从实现模型可以知道要探究上面的问题只需看 TimerThread 的实现就可以了,TimerThread 的 run 方法主要逻辑源码如下: public void run() { try { mainLoop(); } finally { // 有人杀死了这个线程,表现得好像Timer已取消 synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // 消除过时的引用 } } } private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; //从队列里面获取任务时候要加锁 synchronized(queue) { ...... } if (taskFired) task.run();//执行任务 } catch(InterruptedException e) { } } } 可知当任务执行过程中抛出了除 InterruptedException 之外的异常后,唯一的消费线程就会因为抛出异常而终止,那么队列里面的其他待执行的任务就会被清除。所以 TimerTask 的 run 方法内最好使用 try-catch 结构 catch 主可能的异常,不要把异常抛出到 run 方法外。 其实要实现类似 Timer 的功能使用 ScheduledThreadPoolExecutor 的 schedule 是比较好的选择。ScheduledThreadPoolExecutor 中的一个任务抛出了异常,其他任务不受影响的。 ScheduledThreadPoolExecutor 例子如下: /** * Created by cong on 2018/7/12. */ public class ScheduledThreadPoolExecutorTest { static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); public static void main(String[] args) { scheduledThreadPoolExecutor.schedule(new Runnable() { public void run() { System.out.println("---one Task---"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } throw new RuntimeException("error "); } }, 500, TimeUnit.MICROSECONDS); scheduledThreadPoolExecutor.schedule(new Runnable() { public void run() { for (int i =0;i<5;++i) { System.out.println("---two Task---"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }, 1000, TimeUnit.MICROSECONDS); scheduledThreadPoolExecutor.shutdown(); } } 运行结果如下: 之所以 ScheduledThreadPoolExecutor 的其他任务不受抛出异常的任务的影响是因为 ScheduledThreadPoolExecutor 中的 ScheduledFutureTask 任务中 catch 掉了异常,但是在线程池任务的 run 方法内使用 catch 捕获异常并打印日志是最佳实践。

优秀的个人博客,低调大师

Java并发编程笔记之SimpleDateFormat源码分析

SimpleDateFormat 是 Java 提供的一个格式化和解析日期的工具类,日常开发中应该经常会用到,但是由于它是线程不安全的,多线程公用一个 SimpleDateFormat 实例对日期进行解析或者格式化会导致程序出错,本节就讨论下它为何是线程不安全的,以及如何避免。 为了复现上面所说的不安全,我们要用一个例子来突出这个不安全,例子如下: package com.hjc; import java.text.ParseException; import java.text.SimpleDateFormat; /** * Created by cong on 2018/7/12. */ public class SimpleDateFormatTest { //(1)创建单例实例 static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { //(2)创建多个线程,并启动 for (int i = 0; i <10 ; ++i) { Thread thread = new Thread(new Runnable() { public void run() { try {//(3)使用单例日期实例解析文本 System.out.println(sdf.parse("2018-07-12 15:18:00")); } catch (ParseException e) { e.printStackTrace(); } } }); thread.start();//(4)启动线程 } } } 运行结果如下: 代码(1)创建了 SimpleDateFormat 的一个实例,代码(2)创建 10 个线程,每个线程都公用同一个 sdf 对象对文本日期进行解析,多运行几次就会抛出 java.lang.NumberFormatException 异常,加大线程的个数有利于该问题复现。 为什么会出现这样的问题呢? 那么接下来我们就要进入到SimpleDateFormat 源码一探究竟,为了便于分析首先查看 SimpleDateFormat 的类图结构,类图如下所示: 可知每个 SimpleDateFormat 实例里面有一个 Calendar 对象,到后面就会知道SimpleDateFormat 之所以是线程不安全的,其实就是因为 Calendar 是线程不安全的,后者之所以是线程不安全的是因为其中存放日期数据的变量都是线程不安全的,比如里面的 fields,time 等。 接下来我们要看看parse方法到底干了些什么事,源码如下: public Date parse(String text, ParsePosition pos) { //(1)解析日期字符串放入CalendarBuilder的实例calb中,源码很长,省略一部分,自己去看 ..... Date parsedDate; try {//(2)使用calb中解析好的日期数据设置calendar parsedDate = calb.establish(calendar).getTime(); ... } catch (IllegalArgumentException e) { ... return null; } return parsedDate; } Calendar establish(Calendar cal) { ... //(3)重置日期对象cal的属性值 cal.clear(); //(4) 使用calb中中属性设置cal ... //(5)返回设置好的cal对象 return cal; } 代码(1)主要的作用是解析字符串日期并把解析好的数据放入了 CalendarBuilder 的实例 calb 中,CalendarBuilder 是一个建造者模式,用来存放后面需要的数据。 代码(3)重置 Calendar 对象里面的属性值,源码如下: public final void clear(){ for (int i = 0; i < fields.length; ) { stamp[i] = fields[i] = 0; // UNSET == 0 isSet[i++] = false; } areAllFieldsSet = areFieldsSet = false; isTimeSet = false; } 代码(4)使用 calb 中解析好的日期数据设置 cal 对象 代码(5) 返回设置好的 cal 对象 从上面步骤可知代码(3)(4)(5)操作不是原子性操作,当多个线程调用 parse 方法时候比如线程 A 执行了代码(3)(4)也就是设置好了 cal 对象,在执行代码(5)前线程 B 执行了代码(3)清空了 cal 对象,由于多个线程使用的是一个 cal 对象,所以线程 A 执行代码(5)返回的就可能是被线程 B 清空后的对象,当然也有可能线程 B 执行了代码(4)被线程 B 修改后的 cal 对象。从而导致程序错误。 那么,让我们思考一个问题,如何解决SimpleDateFormat 的线程安全性问题呢? 1.第一种方式:每次使用时候 new 一个 SimpleDateFormat 的实例,这样可以保证每个实例使用自己的 Calendar 实例, 但是每次使用都需要 new 一个对象,并且使用后由于没有其它引用,就会需要被回收,开销会很大。 2.第二种方式:究其原因是因为多线程下代码(3)(4)(5)三个步骤不是一个原子性操作,那么容易想到的是对其进行同步,让(3)(4)(5)成为原子操作,可以使用 synchronized 进行同步,例子改造如下所示: /** * Created by cong on 2018/7/12. */ public class SimpleDateFormatTest1 { //(1)创建单例实例 static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { //(2)创建多个线程,并启动 for (int i = 0; i <10 ; ++i) { Thread thread = new Thread(new Runnable() { public void run() { try {// (3)使用单例日期实例解析文本 synchronized (sdf) { System.out.println(sdf.parse("2018-07-12 15:18:00")); } } catch (ParseException e) { e.printStackTrace(); } } }); thread.start();//(4)启动线程 } } } 运行结果如下: 3.第三种方式:使用 ThreadLocal,这样每个线程只需要使用一个 SimpleDateFormat 实例相比第一种方式大大节省了对象的创建销毁开销,并且不需要对多个线程直接进行同步,使用 ThreadLocal 方式来保证线程安全,例子如下: /** * Created by cong on 2018/7/12. */ public class SimpleDateFormatTest2 { // (1)创建threadlocal实例 static ThreadLocal<DateFormat> safeSdf = new ThreadLocal<DateFormat>(){ @Override protected SimpleDateFormat initialValue(){ return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } }; public static void main(String[] args) { // (2)创建多个线程,并启动 for (int i = 0; i < 10; ++i) { Thread thread = new Thread(new Runnable() { public void run() { try {// (3)使用单例日期实例解析文本 System.out.println(safeSdf.get().parse("2018-07-12 15:18:00")); } catch (ParseException e) { e.printStackTrace(); }finally { //(4)使用完毕记得清除,避免内存泄露 safeSdf.remove(); } } }); thread.start();// (4)启动线程 } } } 运行结果如下: 代码(1)创建了一个线程安全的 SimpleDateFormat 实例,代码(3)在使用的时候首先使用 get() 方法获取当前线程下 SimpleDateFormat 的实例,在第一次调用 ThreadLocal 的 get()方法适合会触发其 initialValue 方法用来创建当前线程所需要的 SimpleDateFormat 对象。另外需要注意的是代码(4)使用完毕线程变量后要记得进行清理,以避免内存泄露。

优秀的个人博客,低调大师

Java并发编程-volatile关键字介绍

前言 要学习好Java的多线程,就一定得对volatile关键字的作用机制了熟于胸。最近博主看了大量关于volatile的相关博客,对其有了一点初步的理解和认识,下面通过自己的话叙述整理一遍。 有什么用? volatile主要对所修饰的变量提供两个功能 可见性 防止指令重排序 本篇博客主要对volatile可见性进行探讨,以后发表关于指令重排序的博文。 什么是可见性? 一图胜千言上图已经把JAVA内存模型(JMM)展示得很详细了,简单概括一下 每个Thread有一个属于自己的工作内存(可以理解为每个厨师有一个属于自己的铁锅) 所有Thread共用一个主内存(餐厅所有的厨师共用同一个冰箱) 每个Thread操作数据之前都会去主内存中获取数据(厨师炒菜之前都要去冰箱里拿食材) Thread:厨师 工作内存:铁锅 store&load:放熟食,取食材 主内存:冰箱 读者可思考以下情景:餐厅来了一位顾客点了一份红烧肉,此时有两位大厨(假设大厨之间互不通信),由于互不通信,所以两位大厨都打开冰箱取出食材开始炒菜。最后炒出了两份红烧肉,顾客只要一份。为什么会造成这种结果? 由于大厨之间没有可见性。 将此情景放在JAVA中即是:线程A从主内存中取了一个变量到工作内存中,操作完毕后没有及时放回主内存中,于是线程B去取这个变量已经过期了,取的是线程A操作之前的变量。 如何拥有可见性? 先介绍一下Java内存模型中定义的8种工作内存与主内存之间的原子操作 lock( 锁定 ):作用于主内存的变量,把一个变量标识为一条线程独占的状态。 unlock(解锁):作用于主内存的变量,把一个处于锁定的变量释放出来,释放变量才可以被其他线程锁定。 read(读取):作用于主内存的变量,把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用。 load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。 use(使用):作用于工作内存种的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。 assign(赋值):作用于工作内存中的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。 store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的write操作使用 write(写入):作用于主内存的变量,它把store操作从工作内存中得到的值放入主内存的变量中。 读取赋值一个普通变量的情况 当线程1对主内存对象发起read操作到write操作套流程的时间里,线程2随时都有可能对这个主内存对象发起第二套操作 有什么危害呢? 假设主内存中有一个 int a=0; 线程1和线程2分别执行一次,理想状态下最终a的值为2. a++; 线程1在执行了assign操作之后变量a的真实值已经从0变成了1,但是这个过程发生在工作内存中对其他线程不可见,若线程2此时对变量a的操作,读取到的值仍然为0,因为没有可见性,线程2的操作也仅仅是重复了线程1的操作,再次让a从0变成了1。并没有达到期望的a=2。 读取赋值一个volatile变量的情况 volatile变量对对象的操作更严格: use之前不能被read&load assign之后必须紧跟store&write 也就是说 read-load-use 和 assign-store-write成为了两个不可分割的原子操作 尽管这时候在use和assign之间依然有一段真空期,有可能变量会被其他线程读取,但是无论在哪一个时间点主内存的变量和任一工作内存的变量的值都是相等的。这个特性就导致了volatile变量不适合参与到依赖当前值的运算,如自增。那么依靠可见性的特点volatile可以用在哪些地方呢?《Java虚拟机》提到: 运算结果并不依赖变量的当前值(即结果对产生中间结果不依赖),或者能够确保只有单一的线程修改变量的值 通常volatile用做保存某个状态的boolean值。 部分参考自 volatile变量与普通变量的区别 <<深入理解Java虚拟机 高级特性与最佳实践>>

优秀的个人博客,低调大师

Java并发编程笔记之CyclicBarrier源码分析

JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复用。那么 CyclicBarrier 内部的实现与 CountDownLatch 有何不同那? CounDownLatch在解决多个线程同步方面相对于调用线程的 join 已经提供了不少改进,但是CountDownLatch的计数器是一次性的,也就是等到计数器变为0后,再调用CountDownLatch的await ()和countDown()方法都会立刻返回,这就起不到线程同步的效果了。CyclicBarrier类的功能不限于CountDownLatch所提供的功能,从字面意思理解CyclicBarrier是回环屏障的意思,它可以实现让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为当所有等待线程执行完毕之后,重置CyclicBarrier的状态后可以被重用。下图演示了这一过程。 一.CyclicBarrier的实现原理 为了能一览CyclicBarrier的架构设计,下面先看下CyclicBarrier的类图,如下图: 如上面类图,可以知道CyclicBarrier 内部并不是直接使用AQS实现,而是使用了独占锁ReentrantLock来实现的同步;parties用来记录线程个数,用来表示需要多少线程先调用await后,所有线程才会冲破屏障继续往下运行;而 count 一开始等一parties,每当线程调用await方法后就递减 1 ,当为 0 的时候就表示所有线程都到了屏障点,另外你可能会疑惑为何维护parties 和 count 这两个变量,只有count 不就行了吗?别忘了CyclicBarries是可以被复用的,使用两个变量原因是用parties始终来记录总的线程个数,当count计数器变为 0 后,会使用parties 赋值给count,已达到复用的作用。这两个变量是在构造CyclicBarries对象的时候传递的,源码如下: 这里还有一个变量barrierConmmand也通过构造函数传递而来,这是一个任务,这个任务的执行时机是当所有线程都达到屏障点后。另外CyclicBarrier内部使用独占锁Lock来保证同时只有一个线程调用await方法时候才可以返回,使用lock首先保证了更新计数器count 的原子性,另外使用lock的条件变量 trip 支持了 线程间使用 notify,await 操作进行同步。 最后变量generation内部就一个变量broken用来记录当前屏障是否被打破,另外注意这里broken并没有被声明为volatile ,这是因为锁内使用变量不需要。源码如下: private static class Generation { boolean broken = false; } 接下来重点看一下CyclicBarrier的几个重要的函数,如下: 1.int await() 当前线程调用 CyclicBarrier 的该方法时候,当前线程会被阻塞,知道满足下面条件之一才会返回:(1)parties 个线程都调用了 await()方法,也就是线程都到了屏障点。(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException 异常返回。(3)当前屏障点关联的Generation对象的broken标志被设置为true的时候,会抛出BrokenBarrierException 异常。源码如下: public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } 正如上面代码可以知道内部调用了dowait 方法,第一个参数false说明不设置超时时间,这时候第二个参数没有意义。 2.boolean await(long timeout, TimeUnit unit) 当前线程调用 CyclicBarrier 的该方法时候当前线程会被阻塞,直到满足下面条件之一才会返回: (1) parties 个线程都调用了 await() 函数,也就是线程都到了屏障点,这时候返回 true。 (2) 当设置的超时时间到了后返回 false (3) 其它线程调用了当前线程的 interrupt()方法中断了当前线程,则当前线程会抛出 InterruptedException 异常返回。 (4) 当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时候,会抛出 BrokenBarrierException 异常。源码如下: public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } 正如上面代码可以知道内部调用了dowait 方法,第一个参数true说明设置超时时间,这时候第二个参数是超时时间。 3.int dowait(boolean timed, long nanos) 该方法是实现 CyclicBarrier 的核心功能,源码如下: private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { ... //(1)如果index==0说明所有线程都到到了屏障点,则执行初始化时候传递的任务 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { //(2)执行任务 if (command != null) command.run(); ranAction = true; //(3)激活其它调用await而被阻塞的线程,并重置CyclicBarrier nextGeneration(); //返回 return 0; } finally { if (!ranAction) breakBarrier(); } } // (4)如果index!=0 for (;;) { try { //(5)没有设置超时时间, if (!timed) trip.await(); //(6)设置了超时时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { ... } ... } } finally { lock.unlock(); } } private void nextGeneration() { //(7)唤醒条件队列里面阻塞线程 trip.signalAll(); //(8) 重置CyclicBarrier count = parties; generation = new Generation(); } 上面代码是dowait方法的主干代码,当一个线程调用了dowait方法后首先会获取独占锁lock,如果创建CyclicBarrier的时候传递的参数为 10 ,那么后面 9 个调用线程会被阻塞;然后当前获取线程对计数器count进行递减操作,递减后的count = index = 9 ,因为 index != 0 ,所以当前线程会执行代码(4)。如果是无参数的当前线程调用的是无参数的await()方法,则这里 timed = false,所以当前线程会被放入条件变量trip的阻塞队列,当前线程会被挂起并释放获取的Lock锁;如果调用的有参数的await 方法 则timed = true,则当前线程也会被放入条件变量阻塞队列并释放锁的资源,但是不同的是当前线程会在指定时间超时后自动激活。 当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的 9 个线程中有一个会竞争到lock锁,然后执行第一个线程同样的操作,直到最后一个线程获取到lock的时候,已经有 9 个线程被放入了Lock 的条件队列里面,最后一个线程 count 递减后,count = index 等于 0 ,所以执行代码(2),如果创建CyclicBarrier的时候传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他 9 个线程,并重置CyclicBarrier,然后这 10个线程就可以继续向下执行了。 到目前位置理解了CyclicBarrier的原理后,接下来用几个例子来加深对CyclicBarrier的理解,下面例子我们要实现的是使用两个线程去执行一个被分解的任务 A,当两个线程把自己的任务都执行完毕后在对它们的结果进行汇总处理。例子如下: package com.hjc; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by cong on 2018/7/7. */ public class CyclicBarrierTest1 { // 创建一个CyclicBarrier实例,添加一个所有子线程全部到达屏障后执行的一个任务 private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { public void run() { System.out.println(Thread.currentThread() + " task1 merge result"); } }); public static void main(String[] args) throws InterruptedException { //创建一个线程个数固定为2的线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 加入线程A到线程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " task1-1"); System.out.println(Thread.currentThread() + " enter in barrier"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " enter out barrier"); } catch (Exception e) { e.printStackTrace(); } } }); // 加入线程B到线程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " task1-2"); System.out.println(Thread.currentThread() + " enter in barrier"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " enter out barrier"); } catch (Exception e) { e.printStackTrace(); } } }); // 关闭线程池 executorService.shutdown(); } } 运行结果如下: 如上代码创建了一个 CyclicBarrier 对象,第一个参数为计数器初始值,第二个参数 Runable 是指当计数器为 0 时候需要执行的任务。main 函数里面首先创建了固定大小为 2 的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用 await 方法。 一开始计数器为 2,当第一个线程调用 await 方法时候,计数器会递减为 1,由于计数器不为 0,所以当前线程就到了屏障点会被阻塞,然后第二个线程调用 await 时候,会进入屏障,计数器也会递减现在计数器为 0,就会去执行在 CyclicBarrier 构造时候的任务,执行完毕后就会退出屏障点,并且会唤醒被阻塞的第一个线程,这时候第一个线程也会退出屏障点继续向下运行。 上面的例子说明了多个线程之间是相互等待的,假如计数器为 N,那么调用 await 方法的前面 N-1 的线程都会因为到达屏障点被阻塞,当第 N 个线程调用 await 后,计数器为 0 了,这时候第 N 个线程才会发出通知唤醒前面的 N-1 个线程。也就是全部线程达到屏障点时候才能一块继续向下执行,对与这个例子来说使用 CountDownLatch 也可以达到类似输出结果。 下面在放个例子来说明 CyclicBarrier 的可复用性。 假设一个任务由阶段 1、阶段 2、阶段 3 组成,每个线程要串行的执行阶段 1 和 2 和 3,多个线程执行该任务时候,必须要保证所有线程的阶段 1 全部完成后才能进行阶段 2 执行,所有线程的阶段 2 全部完成后才能进行阶段 3 执行,下面使用 CyclicBarrier 来完成这个需求。例子如下: package com.hjc; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by cong on 2018/7/7. */ public class CyclicBarrierTest2 { // 创建一个CyclicBarrier实例 private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 加入线程A到线程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step3"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); // 加入线程B到线程池 executorService.submit(new Runnable() { public void run() { try { System.out.println(Thread.currentThread() + " step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + " step3"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //关闭线程池 executorService.shutdown(); } } 运行结果如下: 如上代码,在每个子线程执行完 step1 后都调用了 await 方法,所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程完成了 step1 后才会开始执行 step2,然后在 step2 后面调用了 await 方法,这保证了所有线程的 step2 完成后,线程才能开始 step3 的执行,这个功能使用单个 CountDownLatch 是无法完成的。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册