Java入门系列-25-NIO(实现非阻塞网络通信)
还记得之前介绍NIO时对比传统IO的一大特点吗?就是NIO是非阻塞式的,这篇文章带大家来看一下非阻塞的网络操作。
补充:以数组的形式使用缓冲区
package testnio; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class TestBufferArray { public static void main(String[] args) throws IOException { RandomAccessFile raf1=new RandomAccessFile("D:/1.txt","rw"); //1.获取通道 FileChannel channel1=raf1.getChannel(); //2.创建缓冲区数组 ByteBuffer buf1=ByteBuffer.allocate(512); ByteBuffer buf2=ByteBuffer.allocate(512); ByteBuffer[] bufs= {buf1,buf2}; //3.将数据读入缓冲区数组 channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(),0,bufs[0].limit())); System.out.println("-----------"); System.out.println(new String(bufs[1].array(),0,bufs[1].limit())); //写入缓冲区数组到通道中 RandomAccessFile raf2=new RandomAccessFile("D:/2.txt","rw"); FileChannel channel2=raf2.getChannel(); channel2.write(bufs); } }
使用NIO实现阻塞式网络通信
TCP协议的网络通信传统实现方式是通过套接字编程(Socket和ServerSocket),NIO实现TCP网络通信需要用到 Channel 接口的两个实现类:SocketChannel和ServerSocketChannel
使用NIO实现阻塞式网络通信
客户端
package com.jikedaquan.blockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; FileChannel inChannel=null; try { //1、获取通道 sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666)); //用于读取文件 inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ); //2、分配指定大小的缓冲区 ByteBuffer buf=ByteBuffer.allocate(1024); //3、读取本地文件,发送到服务器端 while(inChannel.read(buf)!=-1) { buf.flip(); sChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //关闭通道 if (inChannel!=null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
new InetSocketAddress("127.0.0.1", 1666) 用于向客户端套接字通道(SocketChannel)绑定要连接地址和端口
服务端
package com.jikedaquan.blockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Server { public static void main(String[] args) { ServerSocketChannel ssChannel=null; FileChannel outChannel=null; SocketChannel sChannel=null; try { //1、获取通道 ssChannel = ServerSocketChannel.open(); //用于保存文件的通道 outChannel = FileChannel.open(Paths.get("F:/b.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE); //2、绑定要监听的端口号 ssChannel.bind(new InetSocketAddress(1666)); //3、获取客户端连接的通道 sChannel = ssChannel.accept(); //4、分配指定大小的缓冲区 ByteBuffer buf=ByteBuffer.allocate(1024); //5、接收客户端的数据,并保存到本地 while(sChannel.read(buf)!=-1) { buf.flip(); outChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //6、关闭通道 if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel!=null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(ssChannel!=null) { try { ssChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服务端套接字仅绑定要监听的端口即可
ssChannel.bind(new InetSocketAddress(1666));
上面的代码使用NIO实现的网络通信,可能有同学会问,没有看到阻塞效果啊,确实是阻塞式的看不到效果,因为客户端发送一次数据就结束了,服务端也是接收一次数据就结束了。那如果服务端接收完成数据后,再向客户端反馈呢?
能够看到阻塞效果的网络通信
客户端
package com.jikedaquan.blockingnio2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; FileChannel inChannel=null; try { sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666)); inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ); ByteBuffer buf=ByteBuffer.allocate(1024); while(inChannel.read(buf)!=-1) { buf.flip(); sChannel.write(buf); buf.clear(); } //sChannel.shutdownOutput();//去掉注释掉将不会阻塞 //接收服务器端的反馈 int len=0; while((len=sChannel.read(buf))!=-1) { buf.flip(); System.out.println(new String(buf.array(),0,len)); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { if(inChannel!=null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服务端
package com.jikedaquan.blockingnio2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Server { public static void main(String[] args) { ServerSocketChannel ssChannel=null; FileChannel outChannel=null; SocketChannel sChannel=null; try { ssChannel = ServerSocketChannel.open(); outChannel = FileChannel.open(Paths.get("F:/a.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE); ssChannel.bind(new InetSocketAddress(1666)); sChannel = ssChannel.accept(); ByteBuffer buf=ByteBuffer.allocate(1024); while(sChannel.read(buf)!=-1) { buf.flip(); outChannel.write(buf); buf.clear(); } //发送反馈给客户端 buf.put("服务端接收数据成功".getBytes()); buf.flip(); sChannel.write(buf); } catch (IOException e) { e.printStackTrace(); }finally { if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel!=null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(ssChannel!=null) { try { ssChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服务端将向客户端发送两次数据
选择器(Selector)
想要实现非阻塞的IO,必须要先弄懂选择器。Selector 抽象类,可通过调用此类的 open 方法创建选择器,该方法将使用系统的默认选择器提供者创建新的选择器。
将通道设置为非阻塞之后,需要将通道注册到选择器中,注册的同时需要指定一个选择键的类型 (SelectionKey)。
选择键(SelectionKey)可以认为是一种标记,标记通道的类型和状态。
SelectionKey的静态字段:
OP_ACCEPT:用于套接字接受操作的操作集位
OP_CONNECT:用于套接字连接操作的操作集位
OP_READ:用于读取操作的操作集位
OP_WRITE:用于写入操作的操作集位
用于检测通道状态的方法:
方法名称 | 说明 |
---|---|
isAcceptable() | 测试此键的通道是否已准备好接受新的套接字连接 |
isConnectable() | 测试此键的通道是否已完成其套接字连接操作 |
isReadable() | 测试此键的通道是否已准备好进行读取 |
isWritable() | 测试此键的通道是否已准备好进行写入 |
将通道注册到选择器:
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
IO操作准备就绪的通道大于0,轮询选择器
while(selector.select()>0) { //获取选择键,根据不同的状态做不同的操作 }
实现非阻塞式TCP协议网络通信
非阻塞模式:channel.configureBlocking(false);
客户端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Scanner; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; try { //1、获取通道 sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",1666)); //2、切换非阻塞模式 sChannel.configureBlocking(false); //3、分配指定大小的缓冲区 ByteBuffer buf=ByteBuffer.allocate(1024); //4、发送数据给服务端 Scanner scanner=new Scanner(System.in); //循环从控制台录入数据发送给服务端 while(scanner.hasNext()) { String str=scanner.next(); buf.put((new Date().toString()+"\n"+str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //5、关闭通道 if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服务端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class Server { public static void main(String[] args) throws IOException { //1、获取通道 ServerSocketChannel ssChannel=ServerSocketChannel.open(); //2、切换非阻塞模式 ssChannel.configureBlocking(false); //3、绑定监听的端口号 ssChannel.bind(new InetSocketAddress(1666)); //4、获取选择器 Selector selector=Selector.open(); //5、将通道注册到选择器上,并指定“监听接收事件” ssChannel.register(selector, SelectionKey.OP_ACCEPT); //6、轮询式的获取选择器上已经 “准备就绪”的事件 while(selector.select()>0) { //7、获取当前选择器中所有注册的“选择键(已就绪的监听事件)” Iterator<SelectionKey> it=selector.selectedKeys().iterator(); while(it.hasNext()) { //8、获取准备就绪的事件 SelectionKey sk=it.next(); //9、判断具体是什么事件准备就绪 if(sk.isAcceptable()) { //10、若“接收就绪”,获取客户端连接 SocketChannel sChannel=ssChannel.accept(); //11、切换非阻塞模式 sChannel.configureBlocking(false); //12、将该通道注册到选择器上 sChannel.register(selector, SelectionKey.OP_READ); }else if(sk.isReadable()) { //13、获取当前选择器上“读就绪”状态的通道 SocketChannel sChannel=(SocketChannel)sk.channel(); //14、读取数据 ByteBuffer buf=ByteBuffer.allocate(1024); int len=0; while((len=sChannel.read(buf))>0) { buf.flip(); System.out.println(new String(buf.array(),0,len)); buf.clear(); } } //15、取消选择键 SelectionKey it.remove(); } } } }
服务端接收客户端的操作需要在判断 isAcceptable() 方法内将就绪的套接字通道以读操作注册到 选择器中
在判断 isReadable() 内从通道中获取数据
实现非阻塞式UDP协议网络通信
发送端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.Scanner; public class TestDatagramSend { public static void main(String[] args) throws IOException { //获取通道 DatagramChannel dChannel=DatagramChannel.open(); //非阻塞 dChannel.configureBlocking(false); ByteBuffer buf=ByteBuffer.allocate(1024); Scanner scanner=new Scanner(System.in); while(scanner.hasNext()) { String str=scanner.next(); buf.put(str.getBytes()); buf.flip(); //发送数据到目标地址和端口 dChannel.send(buf,new InetSocketAddress("127.0.0.1", 1666)); buf.clear(); } dChannel.close(); } }
接收端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; public class TestDatagramReceive { public static void main(String[] args) throws IOException { //获取通道 DatagramChannel dChannel=DatagramChannel.open(); dChannel.configureBlocking(false); //绑定监听端口 dChannel.bind(new InetSocketAddress(1666)); //获取选择器 Selector selector=Selector.open(); //读操作注册通道 dChannel.register(selector, SelectionKey.OP_READ); while(selector.select()>0) { Iterator<SelectionKey> it=selector.selectedKeys().iterator(); //迭代选择键 while(it.hasNext()) { SelectionKey sk=it.next(); //通道可读 if(sk.isReadable()) { ByteBuffer buf=ByteBuffer.allocate(1024); //接收数据存入缓冲区 dChannel.receive(buf); buf.flip(); System.out.println(new String(buf.array(),0,buf.limit())); buf.clear(); } } it.remove(); } } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
一线互联网常见的14个Java面试题
跳槽不算频繁,但参加过不少面试(电话面试、face to face面试),面过大/小公司、互联网/传统软件公司,面糊过(眼高手低,缺乏实战经验,挂掉),也面过人,所幸未因失败而气馁,在此过程中不断查缺补漏,养成了踏实、追本溯源、持续改进的习惯,特此将自己经历过、构思过的一些面试题记录下来,如果答案有问题,欢迎拍砖讨论,希望能对找工作或者感兴趣的同学有所帮助,陆续整理中。 1. synchronized和reentrantlock异同 相同点 都实现了多线程同步和内存可见性语义 都是可重入锁 不同点 实现机制不同 synchronized通过java对象头锁标记和Monitor对象实现 reentrantlock通过CAS、ASQ(AbstractQueuedSynchronizer)和locksupport(用于阻塞和解除阻塞)实现 sync
- 下一篇
java多线程中显式锁的轮询检测策略
显式锁简介 java5.0之前,在协调对共享对象的访问时可以使用的机制只有synchronized和volatile,java5.0增加了一种新的机制:ReentrantLock。 锁像synchronized同步块一样,是一种线程同步机制,与synchronized不同的是ReentrantLock提供了一种无条件的、可轮询的、定时的以及可以中断的锁获取操作,并且所有的加锁和解锁的方法都是显式的,所以也叫显式锁。 synchronized的实现中包含了锁机制,但是锁的获取和释放不能人为的进行控制,所以当我们要定时获取锁,检测锁是否被占用时就应当使用显式锁。 显式锁涉及的类和接口 ReentrantLock实现了Lock接口,位于Java的J.U.C包中,包含了一下几个主要方法: 1、void lock(),获取锁; 2、void unlock
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- 设置Eclipse缩进为4个空格,增强代码规范