您现在的位置是:首页 > 文章详情

Netty源码分析之服务端启动流程

日期:2019-02-16点击:498

上节对Netty的做了简单介绍,这节分析下Netty启动流程,后面的源码分析都以Netty4.0.32版本为例,以下面启动代码为例子

public class TimeServer { public void bind(int port) throws Exception { // 1.配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //2.配置参数 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingServerHandler()) .childHandler(new ChildChannelHandler()); // 3.绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //4. 等待服务端关闭 f.channel().closeFuture().sync(); } finally { //5.优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 

组件介绍

EventLoopGroup

EventLoopGroup是Netty的线程池,为Acceptor Selector和IO Selector事件提供线程支持,EventLoopGroup在初始化的时候会创建相应配置数量的EventLoop来提供单个线程,EventLoop功能比较复杂,既是Select的轮询事件线程也是其他IO事件处理的线程。使用时调用EventLoopGroup#next()方法来获取EventLoop。Netty的线程比较复杂后面章节会详细说明。

NioServerSocketChannel

NioServerSocketChannel提供了JDK的Channel和selector的绑定功能,再绑定端口前会调用这个方法绑定操作。同时NioServerSocketChannel有个Unsafe内部类在初始化NioServerSocketChannel时会创建Unsafe作为他的成员变量。Unsafe提供了JDK层的io操作包括读、写、绑定端口等根据继承类不同的不同操作。

ChannelHandler

ChannelHandler是Netty的核心,它的继承包含了2个重要的处理器ChannelOutboundHandler和ChannelInboundHandler。ChannelOutboundHandler是写出IO事件的处理器。ChannelInboundHandler是写入IO事件的处理器同时也包含了多个触发器,包括连接断开触发器,异常触发器,注册完成触发器,取消注册触发器等,由于ChannelHandler比较复杂后面章节会详细说明。

ChannelFuture

ChannelFuture是Netty的异步事件等待器,Netty利用其来实现所有IO事件的异步化,ChannelFuture可以注册多个事件完成监听器,异步事件会在完成后回调监听器。

ServerBootstrap

ServerBootstrap是Netty配置及启动的入口,提供线程池、连接参数、线程池以及处理器的配置,同时提供了绑定端口的方法。

启动时序图

我们以b.bind(port)这行代码作为开始流程,时序图如下:

ServerBootstrap:

  • 1.调用initAndRegister()创建并初始化channel
  • 1.1.调用init(channel)设置配置并注册ChannelHandler
  • 1.2.调用EventLoopGroup里的register(channel)将创建selector并注册到此channel上
  • 2.调用doBind0()绑定端口并启动监听
  • 2.1.调用Channel的bind(localAddress, promise)方法绑定端口

EventLoopGroup:

  • 1.2.1.调用next()获取下个EventLoop
  • 1.2.2.调用register(channel)委托给EventLoop去注册

EventLoop:

  • 1.2.2.1.调用unsafe()获取channel中的unsafe对象
  • 1.2.2.2.调用register(this, promise)委托unsafe注册

Channel:

  • 1.2.2.2.1.调用doRegister()将创建selector并注册到此channel上
  • 1.2.2.2.2.调用pipeline的fireChannelRegistered()触发注册成功事件
  • 1.2.2.2.3.调用pipeline的fireChannelActive()触发激活成功事件
  • 1.2.2.2.3.1.1.1.修改interestOps添加读操作位
  • 2.1.1.委托ChannelPipeline调用bind(localAddress, promise)
  • 2.1.1.1.1.1.调用doBind(localAddress)绑定端口
  • 2.1.1.1.1.2.如果还未激活调用pipeline的fireChannelActive()触发激活成功事件
  • 2.1.1.1.1.2.1.1.1修改interestOps添加读操作位

ChannelPipeline:

  • 1.2.2.2.2.1.调用ChannelHandler链触发注册成功事件
  • 1.2.2.2.3.1.调用ChannelHandler链触发激活成功事件,如果配置自动读则激活读事件
  • 2.1.1.1.调用ChannelHandler链触发绑定事件
  • 2.1.1.1.1.2.1.调用ChannelHandler链触发激活成功事件,如果配置自动读则激活读事件

HeadContext:

  • 1.2.2.2.3.1.1.委托unsafe调用beginRead()开始读
  • 2.1.1.1.1.委托Channel里的unsafe调用bind(localAddress, promise)
  • 2.1.1.1.1.2.1.1.委托unsafe调用beginRead()开始读

源码分析

ServerBootstrap类的线程池、连接参数、线程池以及处理器的配置其实就是成员变量的复制这里就是不多讲,我们直接从b.bind()开始看。

b.bind()的方法直接将端口包装成类SocketAddress交给doBind()处理。doBind()源码如下

 private ChannelFuture doBind(final SocketAddress localAddress) { //初始化Channel并注册Selector final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //绑定监听端口并开始监听 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } } 

这个方法主要做了2件事

  • 调用initAndRegister()初始化Channel并注册Selector
  • 调用doBind0()绑定监听端口并开始监听

注册Selector是默认是异步进行的这里Netty做了一个处理,如果注册Selector已完成则同步调用doBind0()否则注册监听器等待Selector注册完成后调用doBind0(),这样保证整个doBind()方法的异步性。

接下来我们看下initAndRegister()的实现,源码如下:

 final ChannelFuture initAndRegister() { //初始化channel final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //注册Selector ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } 

initAndRegister()方法也是做了2件事

  • 调用init()初始化Channel
  • 调用group().register()注册Selector

channelFactory().newChannel()是创建你配置的Channel这里源码配置了NioServerSocketChannel,而init()的具体实现则是在NioServerSocketChannel中,init()源码如下:

 void init(Channel channel) throws Exception { ... ChannelPipeline p = channel.pipeline(); ... //将从线程组,从处理器,从配置,从属性封装成ServerBootstrapAcceptor处理器,共后续IOSelector使用。 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } 

在上节中我们讲了Neety服务端线程模式可以是主从模型,所以它的配置也分主从配置,init()主要做的将主处理器(ChannelHandler)添加到主ChannelPipeline里,将 从线程组,从处理器(ChannelHandler),从配置,从属性封装成ServerBootstrapAcceptor处理器添加到主ChannelPipeline里,供后续触发客户端连接事件使用。

group().register()先是取了主线程组EventLoopGroup委托EventLoopGroup注册Selector,主线程组中也是调用next()委托给单线程EventLoop去处理

 next().register(channel); 

最后在EventLoop的register()中委托给Channel中的Unsafe处理(Unsafe是在Channel创建的时候被创建的)

 channel.unsafe().register(this, promise); 

我们来看下Unsafe中的register()的源码:

 public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; //如果eventLoop是当前线程直接执行,否则交给传eventLoop线程处理 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } 

Unsafe中的register()做的就是判断next()获取的线程是否是当前线程,如果是同步执行注册,否则异步执行注册,按例子中默认配置会异步调用register0()注册。

我们来看下register0()的源码

 private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //调用JDK去注册Selector doRegister(); neverRegistered = false; registered = true; //设置注册成功通知监听器 safeSetSuccess(promise); //触发注册成功事件 pipeline.fireChannelRegistered(); //如果是第一次则触发激活成功事件 if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } 

register0()做了4件事

  • 调用JDK去注册Selector,源码如下
selectionKey = javaChannel().register(eventLoop().selector, 0, this); 
  • 设置注册成功通知监听器
  • 触发注册成功事件
  • 如果绑定成功则触发激活成功事件

触发注册成功事件和如果绑定成功触发激活成功事件其实是在主pipeline中链式调用用户的配置ChannelHandler,调用过程如下

图1

这里会有个疑问绑定是在注册selector之后进行的为什么这里可以触发激活成功事件,其实在safeSetSuccess(promise)代码执行之后已经通知监听器并开始执行绑定相关的代码了。

触发激活成功事件后如果配置了会触发开始读事件

 public ChannelPipeline fireChannelActive() { head.fireChannelActive(); //如果配置了会触发开始读事件 if (channel.config().isAutoRead()) { channel.read(); } return this; } 

读事件也是链式调用调用过程如下:

图2

可以看到最后调用HeadContext里的read()方法

public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } 

委托给unsafe去处理,处理的方式就是修改interestOps添加读操作位,由IOSelector去触发读事件

selectionKey.interestOps(interestOps | readInterestOp); 

在NioServerSocketChannel中readInterestOp其实对应的是JDK中的SelectionKey.OP_ACCEPT,在ServerSocketChannel构造方法中定义:

 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } 

服务端启动后最先关心是客户端连接的,如果没有客户端的连接,后续无法IO操作,所以这里注册了SelectionKey.OP_ACCEPT来监听客户端的连接。

以上就是初始化Channel并注册Selector的流程

下面说下调用doBind0()绑定监听端口并开始监听流程,doBind0()代码如下:

 private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } 

绑定端口是异步进行的,这里调用channel.eventLoop()获取的线程默认就是之前注册Selector的流程中next()获取的线程。绑定的流程其实也是在主pipeline中链式调用用户的配置ChannelHandler,如图2,可以看到最后调用HeadContext里的bind()方法

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } 

也是委托给unsafe去处理,处理的方式就是调用JDK里的绑定接口去绑定

javaChannel().socket().bind(localAddress); 

如果绑定成功且之前没触发激活成功事件则触发之,这里的流程跟之前讲的一样就不累述。

以上就是绑定监听端口并开始监听的流程。

原文链接:https://my.oschina.net/u/945573/blog/3010849
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章