您现在的位置是:首页 > 文章详情

高性能IO框架Netty:手把手教你解决 “粘包/半包” 问题!

日期:2021-06-23点击:386
前言:demo演示

首先,我们来看个demo

1、EchoServer

public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { EchoServer echoServer = new EchoServer(9999); System.out.println("服务器即将启动"); echoServer.start(); System.out.println("服务器关闭"); } public void start() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler);/*添加到该子channel的pipeline的尾部*/ } }); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的连接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } }

2、EchoServerHandler

@ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); /*** 服务端读取到网络数据后的处理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept[" + request + "] and the counter is:" + counter.incrementAndGet()); String resp = "Hello," + request + ". Welcome to Netty World!" + System.getProperty("line.separator"); ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

使用netty实现了个服务端,当接收到客户端的消息是,打印出来请求的内容,并统计接收请求的次数。

3、EchoClient

public class EchoClient { private final int port; private final String host; public EchoClient(int port, String host) { this.port = port; this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { final Bootstrap b = new Bootstrap(); /*客户端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/ .remoteAddress(new InetSocketAddress(host, port))/*配置要连接服务器的ip地址和端口*/ .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { new EchoClient(9999, "127.0.0.1").start(); } }

4、EchoClientHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import java.util.concurrent.atomic.AtomicInteger; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** 客户端读取到网络数据后的处理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8) + "] and the counter is:" + counter.incrementAndGet()); } /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "test1,test2,test3,test4" + System.getProperty("line.separator"); for (int i = 0; i < 100; i++) { msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

使用netty实现了个客户端,链接建立完成之后向服务端发送消息。循环100次。并且打印服务端返回的消息。并统计返回次数。

执行结果

服务端输出

客户端打印

结果发现,我们客户单发送了100次数据,但实际上只接收了30次。而且每次消息发送的是test1,test2,test3,test4,test5,但实际接受的却有很多相链接起来的。这是为什么呢?为什么不是100次test1,test2,test3,test4,test5呢?这就是TCP传输的粘包/半包问题。

一、什么是TCP粘包半包?

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  • 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  • 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
  • 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
  • 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

二、TCP粘包/半包发生的原因

由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象

UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。

分包产生的原因就简单的多:可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。

更具体的原因有三个,分别如下。

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  • 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长
  • 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。
三、解决粘包半包问题

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

1、在包尾增加分割符

在包尾增加分割符,比如回车换行符进行分割,例如FTP协议;

demo如下:

LineBaseEchoServer

public class LineBaseEchoServer { public static final int PORT = 9998; public static void main(String[] args) throws InterruptedException { LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer(); System.out.println("服务器即将启动"); lineBaseEchoServer.start(); } public void start() throws InterruptedException { final LineBaseServerHandler serverHandler = new LineBaseServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的连接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //添加换行解码器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LineBaseServerHandler()); } } }

LineBaseEchoServer

public class LineBaseEchoServer { public static final int PORT = 9998; public static void main(String[] args) throws InterruptedException { LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer(); System.out.println("服务器即将启动"); lineBaseEchoServer.start(); } public void start() throws InterruptedException { final LineBaseServerHandler serverHandler = new LineBaseServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的连接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //添加换行解码器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LineBaseServerHandler()); } } }
 

LineBaseEchoClient

public class LineBaseEchoClient { private final String host; public LineBaseEchoClient(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { final Bootstrap b = new Bootstrap(); b.group(group)/*将线程组传入*/ .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/ .remoteAddress(new InetSocketAddress(host, LineBaseEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已连接到服务器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //回车符做了分割 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LineBaseClientHandler()); } } public static void main(String[] args) throws InterruptedException { new LineBaseEchoClient("127.0.0.1").start(); } }

LineBaseClientHandler

 public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** 客户端读取到网络数据后的处理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8) + "] and the counter is:" + counter.incrementAndGet()); ctx.close(); } /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "test1,test2,test3,test4,test5" + System.getProperty("line.separator"); for (int i = 0; i < 10; i++) { Thread.sleep(500); System.out.println(System.currentTimeMillis() + ":即将发送数据:" + request); msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

执行效果

2、消息定长

例如每个报文的大小为固定长度200字节,如果不够,空位补空格;

服务端只需将服务端的ChannelInitializerImp 解码器new LineBasedFrameDecoder(1024)替换为new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())即可。

 private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //添加定长报文长度解码器,长度问请求的长度 ch.pipeline().addLast( new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())); ch.pipeline().addLast(new FixedLengthServerHandler()); } } 

3、将消息分为消息头和消息体

消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度。类似与第二条,只是我们按照头部的content-length长度进行定长解码。

原文链接:https://blog.51cto.com/u_15152535/2941898
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章