源码分析Dubbo网络通信篇NettyServer、HeaderExchangeServer
本文主要分析一下NettyServer,HeaderExchangeServer实现细节。
1、NettyServer
NettyServer整个类图如下: 首先从全貌上大概看一下NettyServer对象所持有的属性:
- AbstractPeer
- private final ChannelHandler handler 事件处理Handler。
- private volatile URL url 该协议的第一个服务提供者的URL, Server只需要用到 URL中的参数,与具体某一个服务没什么关系。
- AbstractEndpoint
- private Codec2 codec 编码解码器。
- private int timeout 超时时间
- private int connectTimeout 连接超时时间
- AbstractServer
- private InetSocketAddress localAddress :url host:port地址。
- private InetSocketAddress bindAddress:如果是多网卡,并且指定了 bind.ip、bind.port,如果为空,与localAddress相同。
- private int accepts : AbstractServer#accepts未使用到。
- private int idleTimeout = 600; AbstractServer#accepts未使用到。
- NettyServer
- private Map< String, Channel> channels:< ip:port, channel> 所有通道。
- private ServerBootstrap bootstrap : netty 服务端启动器。
- private io.netty.channel.Channel channel:服务端监听通道。
- private EventLoopGroup bossGroup;Netty boss线程组(负责连接事件)
- private EventLoopGroup workerGroup : nety work线程组(负责IO事件)
1.1 NettyServer 构造方法
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
直接调用父类的public AbstractServer(URL url, ChannelHandler handler)方法,从前面的文章中得知, ChannelHandlers.wrap方法会对ChannelHandler handler进行封装,主要是加入事件分发模式(Dispatch)。
1.1.1 AbstractServer构造方法
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // @1 localAddress = getUrl().toInetSocketAddress(); // @2 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // @3 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); // @4 try { doOpen(); // @5 if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
代码@1:调用父类的构造方法,主要初始化AbstractPeer(channelHandler、url)和AbstractEndpoint(codec2、timeout、idleTimeout )
代码@2:根据URL中的host与端口,创建localAddress。
代码@3:如果配置了< dubbo:parameter key = "bind.ip" value = ""/> 与 < dubbo:parameter key = "bind.port" />,则用该IP与端口创建bindAddress,通常用于多网卡,如果未配置,bindAddress与 localAddress绑定的IP与端口一样。
代码@4:初始化accepts与idleTimeout ,这两个参数未被其他地方使用。
代码@5,调用doOpen方法,正式在相应端口建立网络监听。
1.2、源码分析NettyServer#doOpen
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ServerBootstrap(); // @1 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // @2 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // @3 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // @4 channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) // @5 .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<niosocketchannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // @6 channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
代码@1:创建Netty服务端启动帮助类ServerBootstrap.
代码@2:创建服务端Boss线程,线程名:.NettyServerBoss,主要负责客户端的连接事件,主从多Reactor线程模型中的主线程(连接事件)。
代码@3:创建服务端Work线程组,线程名:NettyServerWorker-序号,线程个数取自参数:iothreads,默认为(CPU核数+1)与32取小值,顾名思义,IO线程数,主要处理读写事件,编码、解码都在IO线程中完成。
代码@4:创建用户Handler,这里是NettyServerHandler。 代码@5:Netty启动的常规写法,关注如下内容:
addLast("decoder", adapter.getDecoder()) : 添加解码器 addLast("encoder", adapter.getEncoder()) :添加编码器 addLast("handler", nettyServerHandler) :添加业务Handler。
这里简单介绍一下流程:
- 客户端建立与服务端连接,此时Boss线程的连接事件触发,建立TCP连接,并向IO线程注册该通道(Channel0)的读事件。
- 当客户端向服务端发送请求消息后,IO线程中的读事件触发,会首先调用adapter.getDecoder() 根据对应的请求协议(例如dubbo)从二进制流中解码出一个完整的请求对象,然后传入到业务handler,例如nettyServerHandler,执行相应的事件方法,例如recive方法。
- 当服务端向Channel写入响应结果时,首先编码器会按照协议编码成二进制流,供客户端解码。
如果对Netty想深入学习的话,请移步到作者的《源码分析Netty系列》
2、HeaderExchangeServer
根据 Dubbo 服务端初始化流程,我们可知,Dubbo 为了封装各种不同的网络实现客户端(netty、mina)等,引入了 Exchangers 层,存在 ExchangeServer,其实现 Server 并内部持有具体的 Server 实现端,例如 NettyServer。
接下来,我们重点来关注一下 HeaderExchangeServer. 核心属性如下:
- ScheduledExecutorService scheduled:心跳线程数,线程名称前缀,dubbo-remoting-server-heartbeat-thread-序号
- private final Server server:具体的Server实现类,例如NettyServer。
- private ScheduledFuture< ?> heartbeatTimer:心跳调度Future,可以通过future取消心跳等动作。
- private int heartbeat:心跳间隔时间
- private int heartbeatTimeout:心跳超时时间,至少为heartbeat的两倍
2.1 构造函数
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeartbeatTimer(); }
说明,主要是通过heartbeat参数设置心跳间隔,如果不配置,则不启动心跳检测。从上面看来HeaderExchangeServer内部持有Server,并封装了心跳的功能,在这里就不细细分析了。
>作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号:中间件兴趣圈目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。
</niosocketchannel>
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Redis5.0之后的内存策略--最新八种算法
前言:这八种算法是基于redis5.0版之后的,他新增了新增allkeys-lfu,volatile-lfu这两种算法,也就是多了LFU算法,而LFU与LRU算法不同在于;LRU是淘汰最近最长时间未使用的页面进行淘汰,而LFU是要求在页置换时置换引用计数最小的页,因为经常使用的页应该有一个较大的引用次数。但是有些页在开始时使用次数很多,但以后就不再使用,这类页将会长时间留在内存中,因此可以将引用计数寄存器定时右移一位,形成指数衰减的平均使用次数。 1. Redis内存策略 1.1 内存使用情况说明 Redis将数据都保存到内存中,如果一直往内存中存储数据,而不维护.将来可能导致内存数据存不下.内存溢出. 1.主动淘汰 在redis赋值操作执行时,可以添加超时时间.当时间一到则数据自动删除. 2.采用算法进行淘汰 1.2 Redis中内存优化算法 1.2.1LRU算法 LRU是Least Recently Used的缩写,即最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰。该算法赋予每个页面一个访问字段,用来记录一个页面自上次被访问以来所经历的时间 t,当须淘汰一...
- 下一篇
深度学习最佳实践
本文首发自公众号:RAIS,欢迎关注。 最佳实践,顾名思义,就是做某事的最佳方法,当然,这里的最佳一定是绝大多数情况,但又不是百分百的情况,我们不必纠结这个问题,我们需要记住的是下面这些方法在深度学习实践中是非常好的做法。 回调机制 如果你看到这里,我有理由认为你是一个懂得程序设计懂得编程的有一定开发经验的程序员,既然如此,你一定对回调不陌生,回调类似于一种观察者的设计模式,我交给你一个任务去执行,交代结束我就继续去做自己的工作了,你执行结束了,不管结果是好是坏,你都要把结果告诉我,这就是回调的含义。 在我们之前的各种深度学习的例子中,总有一个参数是 epochs,含义是网络模型循环迭代的次数,一开始的时候,我们都会给出一个不小的值,让网络训练然后调整参数,这个 epochs 在调整参数的过程中总要保证模型要处于过拟合状态,只有这样,我们才能知道在什么情况下网络达到最优。在前面的大多数例子中这是合理的也是可行的,因为我们的网络训练的很快,数据集也没有大到在可接受时间内无法完成的情况,因此这并不影响什么,但是也有一些例外,例如前文提到的循环神经网络,训练的时间可能就有点让你等得不耐烦了,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker快速安装Oracle11G,搭建oracle11g学习环境