spark2.1.0之源码分析——RPC管道初始化

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81197447

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》

《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》一文的代码清单6——创建TransportClient,《spark2.1.0之源码分析——RPC服务器TransportServer》一文的代码清单3——对TransportServer初始化的实现中都在管道初始化回调函数中调用了TransportContext的initializePipeline方法,initializePipeline方法(见代码清单1)将调用Netty的API对管道初始化。

代码清单1         管道初始化

  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

根据代码清单1,initializePipeline方法的执行步骤如下:

  1. 调用createChannelHandler方法创建TransportChannelHandler,从createChannelHandler的实现(见代码清单2)中可以看到真正创建TransportClient是在这里发生的。从TransportClient的构造过程看到RpcHandler 与TransportClient毫无关系,TransportClient只使用了TransportResponseHandler。TransportChannelHandler在服务端将代理TransportRequestHandler对请求消息进行处理,并在客户端代理TransportResponseHandler对响应消息进行处理。
  2. 对管道进行设置,这里的ENCODER(即MessageEncoder)派生自Netty的ChannelOutboundHandler接口;DECODER(即MessageDecoder)、TransportChannelHandler以及TransportFrameDecoder(由工具类NettyUtils的静态方法createFrameDecoder创建)派生自Netty的ChannelInboundHandler接口;IdleStateHandler同时实现了ChannelOutboundHandler和ChannelInboundHandler接口。根据Netty的API行为,通过addLast方法注册多个handler时,ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,因此在管道两端(无论是服务端还是客户端)处理请求和响应的流程如图1所示。

代码清单2         创建TransportChannelHandler

  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

管道处理请求和响应的流程图

图1       管道处理请求和响应的流程图

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/632422

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
优质分享Android(本站安卓app)

优质分享Android(本站安卓app)

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。