一文详解 Netty 组件
作者:京东物流 张弓言
一、背景
Netty 是一款优秀的高性能网络框架,内部通过 NIO 的方式来处理网络请求,在高负载下也能可靠和高效地处理 I/O 操作
作为较底层的网络通信框架,其被广泛应用在各种中间件的开发中,比如 RPC框架、MQ、Elasticsearch等,这些中间件框架的底层网络通信模块大都利用到了 Netty 强大的网络抽象
下面这篇文章将主要对 Netty 中的各个组件进行分析,并在介绍完了各个组件之后,通过 JSF 这个 RPC 框架为例来分析 Netty 的使用,希望让大家对 Netty 能有一个清晰的了解
二、Netty Server
通过 Netty 来构建一个简易服务端是比较简单的,代码如下:
public class NettyServer { public static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class); public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ChannelFuture channelFuture = serverBootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelHandlerAdapter() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { LOGGER.info("Handler Added"); } }) .childHandler(new ServerChannelInitializer()) .bind(8100); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { LOGGER.info("Netty Server Start !"); } } }); try { channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
上面代码的主要逻辑如下:
- 新建服务端引导启动类 ServerBootstrap,内部封装了各个组件,用来进行服务端的启动
- 新建了两个 EventLoopGroup 用来进行连接处理,此时可以简单的将 EventLoopGroup 理解为多个线程的集合。bossGroup 中的线程用来处理新连接的建立,当新连接建立后,workerGroup 中的每个线程则都会和唯一的客户端 Channel 连接进行绑定,用来处理该 Channel 上的读、写事件
- 指定服务端创建的 Channel 类型为 NioServerSocketChannel
- childOption 用来配置客户端连接的 NioSocketChannel 底层网络参数
- handler 用来指定针对服务端 Channel 的处理器,内部定义了一系列的回调方法,会在服务端 Channel 发生指定事件时进行回调
- childHandler 用来指定客户端 Channel 的处理器,当客户端 Channel 中发生指定事件时,会进行回调
- bind 指定服务端监听端口号
三、Netty Client
public class HelloClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { // 1. 启动类 ChannelFuture channelFuture = new Bootstrap() // 2. 添加 EventLoop .group(workGroup) // 3. 选择客户端 channel 实现 .channel(NioSocketChannel.class) // 4. 添加处理器 .handler(new ChannelInitializer<NioSocketChannel>() { @Override // 在连接建立后被调用 protected void initChannel(NioSocketChannel ch) throws Exception { ZAS ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new StringEncoder()); } }) // 5. 连接到服务器 .connect(new InetSocketAddress("localhost", 8100)); channelFuture.addListener(future -> { if (future.isSuccess()) { ((ChannelFuture) future).channel().writeAndFlush("hello"); } }); channelFuture.channel().closeFuture().sync(); } finally { workGroup.shutdownGracefully(); } } }
上面代码的主要逻辑如下:
- 新建 Bootstrap 用来进行客户端启动
- group() 指定一个 NioEventLoopGroup 实例,用来处理客户端连接的建立和后续事件处理
- handler() 指定 Channel 处理器,
- 当将客户端启动类中的各个属性都设置完毕后,调用 connect() 方法进行服务端连接
从上面的的两个例子可以看出,如果想通过 Netty 实现一个简易的服务器其实是非常简单的,只需要在启动引导类中设置好对应属性,然后完成端口绑定就可以实现。但也正是因为这种简易的实现方式,导致很多人在学习 Netty 的过程中,发现代码是写的出来,但是对内部的组件有什么作用以及为什么这么写可能就不是很清楚了,因此希望通过这一系列文章来加深大家对 Netty 的理解
四、Netty 基本组件
Channel
Netty 中的 Channel 可以看成网络编程中的 Socket,其提供了一系列 IO 操作的 API,比如 read、write、bind、connect 等,大大降低了直接使用 Socket 类的复杂性
整体类继承关系如下:
从上面的继承关系可以看出,NioSocketChannel 和 NioServerSocketChannel 分别对应客户端和服务端的 Channel,两者的直接父类不一致,因此对外提供的功能也是不相同的。比如当发生 read 事件时,NioServerSocketChannel 的主要逻辑就是建立新的连接,而 NioSocketChannel 则是读取传输的字节进行业务处理
下面就以 NioServerSocketChannel 为例,带大家了解下该类的初始化过程,整体流程如下:
- 启动引导类中通过 channel() 指定底层创建的 Channel 类型
- 根据指定的 Channel 类型创建出 ChannelFactory,后续通过该工厂类进行 Channel 的实例化
- 实例化 Channel
channel() 指定 ChannelFactory 类型
在上面的服务端启动过程中,ServerBootstrap 调用 channel() 方法并传入 NioServerSocketChannel,其底层代码逻辑为:
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); } // ReflectiveChannelFactory 构造方法 public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } }
整体逻辑很简单,通过传入的 Class 对象指定一个 Channel 反射工厂,后续调用工厂方法获取指定类型的 Channel 对象
channel 实例化
当服务端启动引导类 ServerBootstrap 调用 bind() 方法之后,内部会走到 Channel 的实例化过程,代码精简如下:
// channel 初始化流程,内部通过 channelFactory 构造 final ChannelFuture initAndRegister() { channel = channelFactory.newChannel(); } // channelFactory 的 newChannel 方法逻辑 public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
ChannelFactory 的整体逻辑就是通过反射的方式新建 Channel 对象,而 Channel 对象的类型则是在启动引导类中通过 channel() 方法进行指定的
在实例化 Channel 的过程中,会对其内部的一些属性进行初始化,而对这些属性的了解,可以使我们对 Netty 中各个组件的作用范围有一个更加清晰的理解,下面看下 NioServerSocketChannel 的构造函数源码
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
上述源码就是一层一层的父类构造,可以对照前面的类关系图进行阅读
NioServerSocketChannel 实例化过程中主要完成了以下内部属性的初始化:
- unsafe 属性进行赋值为 NioMessageUnsafe,后续 Channel 上事件处理的主要逻辑都是由该类完成
- pipeline 属性进行初始化赋值,pipeline 是 Channel 中特别重要的一个属性,后续的所有业务处理器都是通过该 pipeline 组织的
- 指定当前 Channel 的 readInterestOp 属性为 SelectionKey.OP_ACCEPT,用于后续绑定到 Selector 时指定当前 Channel 监听的事件类型
- 指定当前 Channel 非阻塞,ch.configureBlocking(false)
总结
对于 Channel 的实例化流程可以总结如下:
- 启动引导类中通过 channel() 方法指定生成的 ChannelFactory 类型
- 通过 ChannelFactory 来构造对应 Channel,并在实例化的过程中初始化了一些重要属性,比如 pipeline
ChannelPipeline
ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性
ChannelPipeline 初始化
ChannelPipeline 底层初始化源码:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
从 ChannelPipeline 的构造函数可以看出,每一个 ChannelPipeline 底层都是一个双向链表结构,默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理,处理细节会在后续文章中展现
addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } // 回调 ChannelHandler 中的 handlerAdded() 方法 callHandlerAdded0(newCtx); return this; } private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理
整个方法的逻辑为:
- 判断当前 ChannelHandler 是否已经添加
- 将当前 ChannelHandler 包装成 ChannelHandlerContext,并将其添加到 ChannelPipeline 的双向链表中
- 回调添加的 ChannelHandler 中的 handlerAdded() 方法
Channel、ChannelPipeline、ChannelHandler 关系
Channel、ChannelPipeline和 ChannelHandler 三者的关系如图所示:
- 每一个 Channel 中都会包含一个 ChannelPipeline 属性
- ChannelPipeline 是一个双向链表结构,默认会包含 HeadContext 和 TailContext 两个节点
- 当向 ChannelPipeline 中添加 ChannelHandler 时,会包装成 ChannelContext 插入到 ChannelPipeline 链表中
- 当 Channel 中发生指定事件时,该事件就会在 ChannelPipeline 中沿着双向链表进行传播,调用各个 ChannelHandler 中的指定方法,完成相应的业务处理
Netty 正是通过 ChannelPipeline 这一结构为用户提供了自定义业务逻辑的扩展点,用户只需要向 ChannelPipeline 中添加处理对应业务逻辑的 ChannelHandler,之后当指定事件发生时,该 ChannelHandler 中的对应方法就会进行回调,实现业务的处理
ChannelHandler
ChannelHandler 是 Netty 中业务处理的核心类,当有 IO 事件发生时,该事件会在 ChannelPipeline 中进行传播,并依次调用到 ChannelHandler 中的指定方法
ChannelHandler 的类继承关系如下:
从上面的类继承关系可以看出,ChannelHandler 大致可以分为 ChannelInboundHandler 和 ChannelOutboundHandler,分别用来处理读、写事件
ChannInboundHandler
public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
在 ChannelInboundHandler 中定义了一系列的回调方法,用户可以实现该接口并重写相应的方法来自定义的业务逻辑。
重写方法逻辑是简单的,但很多人其实不清楚的是这些回调方法到底在什么场景下会被调用,如何调用,只有了解了这些回调方法的调用时机,才能在更适宜的地方完成相应功能
channelRegistered
channelRegistered() 从方法名理解是当 Channel 完成注册之后会被调用,那么何为 Channel 注册?
下面就以 Netty 服务端启动过程中的部分源码为例(详细源码分析会在后续文章中),看下 channelRegistered() 的调用时机
在 Netty 服务端启动时,会调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法,精简代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // neverRegistered 初始值为 true boolean firstRegistration = neverRegistered; // 将 Channel 绑定到对应 eventLoop 中的 Selector 上 doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); // 调用 ChannelHandler 中的 ChannelRegistered() pipeline.fireChannelRegistered(); } } protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
从 Netty 底层的 register() 方法可以看出,ChannelHandler 中的 ChannelRegistered() 调用时机是在调用 pipeline.fireChannelRegistered() 时触发的,此时已经完成的逻辑为:
- 通过传入的 EventLoopGroup 得到了该 Channel 对应的 EventLoop,并与Channel 中的对应属性完成了绑定;AbstractChannel.this.eventLoop = eventLoop 逻辑
- 当前 Channel 已经绑定到了对应 EventLoop 中的 Selector 上;doRegister() 逻辑
- ChannelHandler 中的 handlerAdded() 方法已经完成了回调;pipeline.invokeHandlerAddedIfNeeded() 逻辑
因此当 Channel 和对应的 Selector 完成了绑定,Channel 中 pipeline 上绑定的 ChannelHandler 的channelRegisted() 方法就会进行回调
channelActive
上面已经分析了channelRegistered() 方法的调用时机,也就是当 Channel 绑定到了对应 Selector 上之后就会进行回调,下面开始分析 channelActive() 方法的调用时机
对于服务端 Channel,前面还只是将 Channel 注册到了 Selector 上,还没有调用到 bind() 方法完成真正的底层端口绑定,那么有没有可能当服务端 Channel 完成端口监听之后,就会调用到 channelActive() 方法呢?
下面继续分析,在上面完成了 Channel 和 Selector 的注册之后,Netty 服务端启动过程中会继续调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#bind 逻辑:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
在该方法中完成了以下逻辑:
- 完成了 Channel 和本地端口的绑定
- 绑定成功后,isActive() 方法返回 true,此时发布 ChannelActive 事件,进行方法回调
- safeSetSuccess() 中会回调到服务端启动过程中添加的 listener 方法,表明当前 Channel 完成了端口绑定
总结:
当 Channel 调用了 bind() 方法完成端口绑定之后,channelActive() 方法会进行回调
channelRead
该方法的调用时机,服务端和客户端是不一致的
服务端 channelRead
服务端 Channel 绑定到 Selector 上时监听的是 Accept 事件,当客户端有新连接接入时,会回调 channelRead() 方法,完成新连接的接入
Netty 在服务端启动过程中,会默认添加一个 ChannelHandler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 来处理新连接的接入
客户端 channelRead
当服务端处理完 Accept 事件后,会生成一个和客户端通信的 Channel,该 Channel 也会注册到对应的 Selector 上,并监听 read 事件
当客户端向该 Channel 中发送数据时就会触发 read 事件,调用到 channelRead() 方法(Netty 内部的源码处理会在后续的文章中进行分析)
exceptionCaught
当前 ChannelHandler 中各回调方法处理过程中如果发生了异常就会回调该方法

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
CPU推理|使用英特尔 Sapphire Rapids 加速 PyTorch Transformers
在 最近的一篇文章 中,我们介绍了代号为 Sapphire Rapids 的第四代英特尔至强 CPU 及其新的先进矩阵扩展 (AMX) 指令集。通过使用 Amazon EC2 上的 Sapphire Rapids 服务器集群并结合相应的英特尔优化库,如 英特尔 PyTorch 扩展 (IPEX),我们展示了如何使用 CPU 进行高效的分布式大规模训练,与上一代至强 (Ice Lake) 相比,Sapphire Rapids 实现了 8 倍的加速,取得了近线性的扩展比。 本文我们将重点关注推理。使用基于 PyTorch 的 Hugging Face transformers 模型,我们首先在 Ice Lake 服务器上分别测量它们在长、短两种文本序列上的性能。然后,我们在 Sapphire Rapids 服务器和最新版本的 Hugging Face Optimum Intel 上执行相同的测试,并比较两代 CPU 的性能。这里,Optimum Intel 是一个专用于英特尔平台的硬件加速开源库。 让我们开始吧! 为什么你应该考虑使用 CPU 推理 在决定使用 CPU 还是 GPU 进行深...
- 下一篇
前端监控之性能与异常
作者:京东零售 李菲菲 1 前言 现有的大部分监控方案都是针对服务端的,而针对前端的监控很少,诸如线上页面的白屏时间是多少、静态资源的加载情况如何、接口请求耗时好久、什么时候挂掉了、为什么挂掉,这些都不清楚。 同时,在产品推广过程中,经常需要统计页面的使用情况及用户行为,从而可以从运营和产品的角度去了解用户群体,进而迭代升级产品,使其更加贴近用户,为业务的扩展提供更多可能性。 因而,我们需要一个前端的页面监控系统,持续监控和预警页面性能的状况,并且在发现瓶颈时用于指导优化工作。 2 前端监控目标 前端监控主要包含两大块:性能监控及异常监控 保证稳定性(异常监控) 错误监控包括 JavaScript 代码错误,Promsie 错误,接口(XHR,fetch)错误,资源加载错误(script,link等)等,这些错误大多会导致页面功能异常甚至白屏。 提升用户体验(性能监控) 性能监控包括页面的加载时间,接口响应时间等,侧面反应了用户体验的好坏。 3 性能监控 3.1 简单描述页面加载 简单看一下,从输入url到页面加载完成的过程如下: 首先需要通过 DNS(域名解析系统)将 URL 解析为...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程