Java并发编程(04):线程间通信,等待/通知机制
一、概念简介
1、线程通信
在操作系统中,线程是个独立的个体,但是在线程执行过程中,如果处理同一个业务逻辑,可能会产生资源争抢,导致并发问题,通常使用互斥锁来控制该逻辑。但是在还有这样一类场景,任务执行是有顺序控制的,例如常见的报表数据生成:
- 启动数据分析任务,生成报表数据;
- 报表数据存入指定位置数据容器;
- 通知数据搬运任务,把数据写入报表库;
该场景在相对复杂的系统中非常常见,如果基于多线程来描述该过程,则需要线程之间通信协作,才能有条不紊的处理该场景业务。
2、等待通知机制
如上的业务场景,如果线程A生成数据过程中,线程B一直在访问数据容器,判断该过程的数据是否已经生成,则会造成资源浪费。正常的流程应该如图,线程A和线程B同时启动,线程A开始处理数据生成任务,线程B尝试获取容器数据,数据还没过来,线程B则进入等待状态,当线程A的任务处理完成,则通知线程B去容器中获取数据,这样基于线程等待和通知的机制来协作完成任务。
3、基础方法
等待/通知机制的相关方法是Java中Object层级的基础方法,任何对象都有该方法:
- notify:随机通知一个在该对象上等待的线程,使其结束wait状态返回;
- notifyAll:唤醒在该对象上所有等待的线程,进入对象锁争抢队列中;
- wait:线程进入waiting等待状态,不会争抢锁对象,也可以设置等待时间;
线程的等待通知机制,就是基于这几个基础方法。
二、等待通知原理
1、基本原理
等待/通知机制,该模式下指线程A在不满足任务执行的情况下调用对象wait()方法进入等待状态,线程B修改了线程A的执行条件,并调用对象notify()或者notifyAll()方法,线程A收到通知后从wait状态返回,进而执行后续操作。两个线程通过基于对象提供的wait()/notify()/notifyAll()等方法完成等待和通知间交互,提高程序的可伸缩性。
2、实现案例
通过线程通信解决上述数据生成和存储任务的解耦流程。
public class NotifyThread01 {
static Object lock = new Object() ;
static volatile List<String> dataList = new ArrayList<>();
public static void main(String[] args) throws Exception {
Thread saveThread = new Thread(new SaveData(),"SaveData");
saveThread.start();
TimeUnit.SECONDS.sleep(3);
Thread dataThread = new Thread(new AnalyData(),"AnalyData");
dataThread.start();
}
// 等待数据生成,保存
static class SaveData implements Runnable {
@Override
public void run() {
synchronized (lock){
while (dataList.size()==0){
try {
System.out.println(Thread.currentThread().getName()+"等待...");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1));
}
}
}
// 生成数据,通知保存
static class AnalyData implements Runnable {
@Override
public void run() {
synchronized (lock){
dataList.add("hello,");
dataList.add("java");
lock.notify();
System.out.println("AnalyData End...");
}
}
}
}
注意:除了dataList满足写条件,还要在AnalyData线程执行通知操作。
三、管道流通信
1、管道流简介
基本概念
管道流主要用于在不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,进而实现不同线程间的通信。
实现分类
管道字节流:PipedInputStream和PipedOutputStream;
管道字符流:PipedWriter和PipedReader;
新IO管道流:Pipe.SinkChannel和Pipe.SourceChannel;
2、使用案例
public class NotifyThread02 {
public static void main(String[] args) throws Exception {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream();
// 链接输入流和输出流
pos.connect(pis);
// 写数据线程
new Thread(new Runnable() {
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
// 将从键盘读取的数据写入管道流
PrintStream ps = new PrintStream(pos);
while (true) {
try {
System.out.print(Thread.currentThread().getName());
ps.println(br.readLine());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "输入数据线程:").start();
// 读数据线程
new Thread(new Runnable() {
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
while (true) {
try {
System.out.println(Thread.currentThread().getName() + br.readLine());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}, "输出数据线程:").start();
}
}
写线程向管道流写入数据,读线程读取数据,完成基本通信流程。
四、生产消费模式
1、业务场景
基于线程等待通知机制:实现工厂生产一件商品,通知商店卖出一件商品的业务流程。
2、代码实现
public class NotifyThread03 {
public static void main(String[] args) {
Product product = new Product();
ProductFactory productFactory = new ProductFactory(product);
ProductShop productShop = new ProductShop(product);
productFactory.start();
productShop.start();
}
}
// 产品
class Product {
public String name ;
public double price ;
// 产品是否生产完毕,默认没有
boolean flag ;
}
// 产品工厂:生产
class ProductFactory extends Thread {
Product product ;
public ProductFactory (Product product){
this.product = product;
}
@Override
public void run() {
int i = 0 ;
while (i < 20) {
synchronized (product) {
if (!product.flag){
if (i%2 == 0){
product.name = "鼠标";
product.price = 79.99;
} else {
product.name = "键盘";
product.price = 89.99;
}
System.out.println("产品:"+product.name+"【价格:"+product.price+"】出厂...");
product.flag = true ;
i++;
// 通知消费者
product.notifyAll();
} else {
try {
// 进入等待状态
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
// 产品商店:销售
class ProductShop extends Thread {
Product product ;
public ProductShop (Product product){
this.product = product ;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag == true ){
System.out.println("产品:"+product.name+"【价格"+(product.price*2)+"】卖出...");
product.flag = false ;
product.notifyAll(); //唤醒生产者
} else {
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
流程描述:ProductFactory生成一件商品,通知商店售卖,通过flag标识判断控制是否进入等待状态,商店卖出商品后,再次通知工厂生产商品。
五、源代码地址
GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
阿里云新品发布会周刊第53期 丨 数据泄露的危害有多大?该怎么保护我们的数据信息安全
点击订阅新品发布会 新产品、新版本、新技术、新功能、价格调整,评论在下方,下期更新!关注更多新品发布会! 热门阅读 1、数据泄露的危害有多大?该怎么保护我们的数据信息安全 在信息时代阔步发展的今天,人们在享受着信息化社会所带来的智能、高效、便捷的同时,也对自身的个人信息安全产生了深深的担忧……“信息裸奔”让人们成了“透明人”,隐私泄露层出不穷,财产受损现象频繁发生。你的信息安全吗?数据泄露的危害有多大?该怎么保护我们的数据信息安全? 查看原文 2、疫情之后,半数美国人认为智能设备中语音控制必不可少 据Syntiant近期进行的一项用户调查显示,受新冠疫情影响,超过一半的美国人认为智能设备中语音控制功能必不可少。其中,免提智能手机和电视遥控器的优先级比较高,因为消费者选择语音控制以减少接触。 查看原文 3、深度数据对比分析:阿里云服务器和腾讯云服务器那家好? 服务器具有维护成本低,安全稳定,高可扩展性和 7 X 24 小时的售后支持的优势,因此云服务器成为中小企业建站的首要选择。国内的云服务器竞争也进入了跑马圈地的时代,以阿里云、腾讯云、百度云三大BAT为首,不断推出优惠活动,争取更多的...
-
下一篇
zyplayer-doc 1.0.5 发布,小目标是在线化的 Navicat 数据库管理工具
zyplayer-doc是一款在线文档工具,现有swagger 文档、dubbo文档、数据库文档、WIKI文档、ElasticSearch文档等,管理端具有人员管理、权限管理功能等功能。项目后端使用spring-boot、mybatis-plus等框架,前端使用zui、Vue、element-ui等框架。 Added 1、swagger文档支持禁用和启用,默认值及展示优化 #群需求 2、数据库文档增加oracle支持#群友提供#,去掉jta事务管理 #I1DU2K 3、新增数据互导功能,可支持不同库表间的数据互导 4、数据库文档以tab多标签页形式展示,页面不销毁 5、增加表数据查看功能,可分页查看表数据,SQL美化功能 6、增加DDL建表语句展示,优化数据源连接池管理 7、导出数据库表增加excel格式导出,库表列表展示改为 表名+表注释 #I17PPP #I17OXH 本项目地址:https://gitee.com/zyplayer/zyplayer-doc 在线体验地址:http://doc.zyplayer.com 账号:zyplayer 密码:123456 本次更新着重优化了...
相关文章
文章评论
共有0条评论来说两句吧...