Netty源码分析之服务端启动流程
上节对Netty的做了简单介绍,这节分析下Netty启动流程,后面的源码分析都以Netty4.0.32版本为例,以下面启动代码为例子
public class TimeServer { public void bind(int port) throws Exception { // 1.配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //2.配置参数 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingServerHandler()) .childHandler(new ChildChannelHandler()); // 3.绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //4. 等待服务端关闭 f.channel().closeFuture().sync(); } finally { //5.优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
组件介绍
EventLoopGroup
EventLoopGroup是Netty的线程池,为Acceptor Selector和IO Selector事件提供线程支持,EventLoopGroup在初始化的时候会创建相应配置数量的EventLoop来提供单个线程,EventLoop功能比较复杂,既是Select的轮询事件线程也是其他IO事件处理的线程。使用时调用EventLoopGroup#next()方法来获取EventLoop。Netty的线程比较复杂后面章节会详细说明。
NioServerSocketChannel
NioServerSocketChannel提供了JDK的Channel和selector的绑定功能,再绑定端口前会调用这个方法绑定操作。同时NioServerSocketChannel有个Unsafe内部类在初始化NioServerSocketChannel时会创建Unsafe作为他的成员变量。Unsafe提供了JDK层的io操作包括读、写、绑定端口等根据继承类不同的不同操作。
ChannelHandler
ChannelHandler是Netty的核心,它的继承包含了2个重要的处理器ChannelOutboundHandler和ChannelInboundHandler。ChannelOutboundHandler是写出IO事件的处理器。ChannelInboundHandler是写入IO事件的处理器同时也包含了多个触发器,包括连接断开触发器,异常触发器,注册完成触发器,取消注册触发器等,由于ChannelHandler比较复杂后面章节会详细说明。
ChannelFuture
ChannelFuture是Netty的异步事件等待器,Netty利用其来实现所有IO事件的异步化,ChannelFuture可以注册多个事件完成监听器,异步事件会在完成后回调监听器。
ServerBootstrap
ServerBootstrap是Netty配置及启动的入口,提供线程池、连接参数、线程池以及处理器的配置,同时提供了绑定端口的方法。
启动时序图
我们以b.bind(port)这行代码作为开始流程,时序图如下:
ServerBootstrap:
- 1.调用initAndRegister()创建并初始化channel
- 1.1.调用init(channel)设置配置并注册ChannelHandler
- 1.2.调用EventLoopGroup里的register(channel)将创建selector并注册到此channel上
- 2.调用doBind0()绑定端口并启动监听
- 2.1.调用Channel的bind(localAddress, promise)方法绑定端口
EventLoopGroup:
- 1.2.1.调用next()获取下个EventLoop
- 1.2.2.调用register(channel)委托给EventLoop去注册
EventLoop:
- 1.2.2.1.调用unsafe()获取channel中的unsafe对象
- 1.2.2.2.调用register(this, promise)委托unsafe注册
Channel:
- 1.2.2.2.1.调用doRegister()将创建selector并注册到此channel上
- 1.2.2.2.2.调用pipeline的fireChannelRegistered()触发注册成功事件
- 1.2.2.2.3.调用pipeline的fireChannelActive()触发激活成功事件
- 1.2.2.2.3.1.1.1.修改interestOps添加读操作位
- 2.1.1.委托ChannelPipeline调用bind(localAddress, promise)
- 2.1.1.1.1.1.调用doBind(localAddress)绑定端口
- 2.1.1.1.1.2.如果还未激活调用pipeline的fireChannelActive()触发激活成功事件
- 2.1.1.1.1.2.1.1.1修改interestOps添加读操作位
ChannelPipeline:
- 1.2.2.2.2.1.调用ChannelHandler链触发注册成功事件
- 1.2.2.2.3.1.调用ChannelHandler链触发激活成功事件,如果配置自动读则激活读事件
- 2.1.1.1.调用ChannelHandler链触发绑定事件
- 2.1.1.1.1.2.1.调用ChannelHandler链触发激活成功事件,如果配置自动读则激活读事件
HeadContext:
- 1.2.2.2.3.1.1.委托unsafe调用beginRead()开始读
- 2.1.1.1.1.委托Channel里的unsafe调用bind(localAddress, promise)
- 2.1.1.1.1.2.1.1.委托unsafe调用beginRead()开始读
源码分析
ServerBootstrap类的线程池、连接参数、线程池以及处理器的配置其实就是成员变量的复制这里就是不多讲,我们直接从b.bind()开始看。
b.bind()的方法直接将端口包装成类SocketAddress交给doBind()处理。doBind()源码如下
private ChannelFuture doBind(final SocketAddress localAddress) { //初始化Channel并注册Selector final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //绑定监听端口并开始监听 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
这个方法主要做了2件事
- 调用initAndRegister()初始化Channel并注册Selector
- 调用doBind0()绑定监听端口并开始监听
注册Selector是默认是异步进行的这里Netty做了一个处理,如果注册Selector已完成则同步调用doBind0()否则注册监听器等待Selector注册完成后调用doBind0(),这样保证整个doBind()方法的异步性。
接下来我们看下initAndRegister()的实现,源码如下:
final ChannelFuture initAndRegister() { //初始化channel final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //注册Selector ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
initAndRegister()方法也是做了2件事
- 调用init()初始化Channel
- 调用group().register()注册Selector
channelFactory().newChannel()是创建你配置的Channel这里源码配置了NioServerSocketChannel,而init()的具体实现则是在NioServerSocketChannel中,init()源码如下:
void init(Channel channel) throws Exception { ... ChannelPipeline p = channel.pipeline(); ... //将从线程组,从处理器,从配置,从属性封装成ServerBootstrapAcceptor处理器,共后续IOSelector使用。 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
在上节中我们讲了Neety服务端线程模式可以是主从模型,所以它的配置也分主从配置,init()主要做的将主处理器(ChannelHandler)添加到主ChannelPipeline里,将 从线程组,从处理器(ChannelHandler),从配置,从属性封装成ServerBootstrapAcceptor处理器添加到主ChannelPipeline里,供后续触发客户端连接事件使用。
group().register()先是取了主线程组EventLoopGroup委托EventLoopGroup注册Selector,主线程组中也是调用next()委托给单线程EventLoop去处理
next().register(channel);
最后在EventLoop的register()中委托给Channel中的Unsafe处理(Unsafe是在Channel创建的时候被创建的)
channel.unsafe().register(this, promise);
我们来看下Unsafe中的register()的源码:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; //如果eventLoop是当前线程直接执行,否则交给传eventLoop线程处理 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
Unsafe中的register()做的就是判断next()获取的线程是否是当前线程,如果是同步执行注册,否则异步执行注册,按例子中默认配置会异步调用register0()注册。
我们来看下register0()的源码
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //调用JDK去注册Selector doRegister(); neverRegistered = false; registered = true; //设置注册成功通知监听器 safeSetSuccess(promise); //触发注册成功事件 pipeline.fireChannelRegistered(); //如果是第一次则触发激活成功事件 if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
register0()做了4件事
- 调用JDK去注册Selector,源码如下
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
- 设置注册成功通知监听器
- 触发注册成功事件
- 如果绑定成功则触发激活成功事件
触发注册成功事件和如果绑定成功触发激活成功事件其实是在主pipeline中链式调用用户的配置ChannelHandler,调用过程如下
图1
这里会有个疑问绑定是在注册selector之后进行的为什么这里可以触发激活成功事件,其实在safeSetSuccess(promise)代码执行之后已经通知监听器并开始执行绑定相关的代码了。
触发激活成功事件后如果配置了会触发开始读事件
public ChannelPipeline fireChannelActive() { head.fireChannelActive(); //如果配置了会触发开始读事件 if (channel.config().isAutoRead()) { channel.read(); } return this; }
读事件也是链式调用调用过程如下:
图2
可以看到最后调用HeadContext里的read()方法
public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
委托给unsafe去处理,处理的方式就是修改interestOps添加读操作位,由IOSelector去触发读事件
selectionKey.interestOps(interestOps | readInterestOp);
在NioServerSocketChannel中readInterestOp其实对应的是JDK中的SelectionKey.OP_ACCEPT,在ServerSocketChannel构造方法中定义:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
服务端启动后最先关心是客户端连接的,如果没有客户端的连接,后续无法IO操作,所以这里注册了SelectionKey.OP_ACCEPT来监听客户端的连接。
以上就是初始化Channel并注册Selector的流程
下面说下调用doBind0()绑定监听端口并开始监听流程,doBind0()代码如下:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
绑定端口是异步进行的,这里调用channel.eventLoop()获取的线程默认就是之前注册Selector的流程中next()获取的线程。绑定的流程其实也是在主pipeline中链式调用用户的配置ChannelHandler,如图2,可以看到最后调用HeadContext里的bind()方法
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
也是委托给unsafe去处理,处理的方式就是调用JDK里的绑定接口去绑定
javaChannel().socket().bind(localAddress);
如果绑定成功且之前没触发激活成功事件则触发之,这里的流程跟之前讲的一样就不累述。
以上就是绑定监听端口并开始监听的流程。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Android调试神器stetho使用详解和改造
本文由云+社区发表 作者:NaOH 概述 stetho是Facebook开源的一个Android调试工具,项目地址:facebook/stetho 通过Stetho,开发者可以使用chrome的inspect功能,对Android应用进行调试和查看。 功能概述 stetho提供的功能主要有: Network Inspection:网络抓包,如果你使用的是当前流行的OkHttp或者Android自带的 HttpURLConnection,你可以轻松地在chrome inspect窗口的network一栏抓到所有的网络请求和回包,还用啥Postman,还用啥Fiddler哦(开个玩笑,一些场合还是需要用的,毕竟Stetho Network Inspection 只是用来查看回报和发送数据是否有误,在开发初期,调试API还是用Postman快一点) Database Inspection:数据库查看,可以直接看到当前应用的sqlite数据库,而且是可视化的,不需要再下什么奇怪的工具或者用命令行看了。这个确实非常棒! View Hierarchy:布局层级查看,免去使用查看布局边界的花花绿绿带...
- 下一篇
负载均衡获得真实源IP的6种方法
除了X-FORWARD-FOR,负载均衡中获得真实源IP的方法还有很多种。 本文抛砖引玉,主要介绍获得真实源IP的多种方法,而不是具体配置。 负载均衡获得真实IP的方法有很多种,将形成专题文章。 本文为第一篇,主要做介绍和优劣对比。 小慢哥的原创文章,欢迎转载 获得真实IP的6种方法 当数据包从负载均衡器往后端转发时候,真实源IP可在L3、L4、L7实现,并且分别有2种方法可以获得真实IP,因此共有6种方法: 保持L3层源IP不变,根据连接次数可以分为 一次连接模式,如lvs 二次连接模式,如haproxy的透明模式 在L4层数据里,添加源IP信息,有2种模式 在4层的option字段里增加源IP信息,比如tcp option、udp option 在4层末尾和7层开头之间,增加proxy protocol信息 在L7层数据里,增加源IP信息,有2种模式 协议自带,例如HTTP的X-FORWARD-FOR 业务程序自行实现 一次连接与二次连接 一次连接:负载均衡器对数据包仅做转发,而不对后端重新发起三次握手 二次连接:和一次连接相对应,在tcp转发时候,对后端重新进行了三次握手。上面所...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题