Spark 通信原理分析
Spark通信原理(发件箱和收件箱)
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
 - csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
 
Youtube视频分享
- youtube:https://youtu.be/3vUVwbEGf1E
 
BiliBili视频分享
- bilibili : https://www.bilibili.com/video/av37442199/
 
Spark通信说明图
发件箱Outbox
发送消息在LinkList中存储
  private val messages = new java.util.LinkedList[OutboxMessage]
  
  /**
   * Send a message. If there is no active connection, cache it and launch a new connection. If
   * [[Outbox]] is stopped, the sender will be notified with a [[SparkException]].
   */
  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }
 
 消息发送
- 取出列表中的第一个元素 message = messages.poll()
 - 调用消息发送 message.sendWith(_client)
 
/**
   * Drain the message queue. If there is other draining thread, just exit. If the connection has
   * not been established, launch a task in the `nettyEnv.clientConnectionExecutor` to setup the
   * connection.
   */
  private def drainOutbox(): Unit = {
    var message: OutboxMessage = null
    synchronized {
      if (stopped) {
        return
      }
      if (connectFuture != null) {
        // We are connecting to the remote address, so just exit
        return
      }
      if (client == null) {
        // There is no connect task but client is null, so we need to launch the connect task.
        launchConnectTask()
        return
      }
      if (draining) {
        // There is some thread draining, so just exit
        return
      }
      message = messages.poll()
      if (message == null) {
        return
      }
      draining = true
    }
    while (true) {
      try {
        val _client = synchronized { client }
        if (_client != null) {
          message.sendWith(_client)
        } else {
          assert(stopped == true)
        }
      } catch {
        case NonFatal(e) =>
          handleNetworkFailure(e)
          return
      }
      synchronized {
        if (stopped) {
          return
        }
        message = messages.poll()
        if (message == null) {
          draining = false
          return
        }
      }
    }
  }
 
 收件箱Inbox
- 对RpcMessage类型消息进行端点的 receiveAndReply 方法调用
 - 对OneWayMessage类型消息进行端点的 receive方法调用
 
/**
   * Process stored messages.
   */
  def process(dispatcher: Dispatcher): Unit = {
    var message: InboxMessage = null
    inbox.synchronized {
      if (!enableConcurrent && numActiveThreads != 0) {
        return
      }
      message = messages.poll()
      if (message != null) {
        numActiveThreads += 1
      } else {
        return
      }
    }
    while (true) {
      safelyCall(endpoint) {
        message match {
          case RpcMessage(_sender, content, context) =>
            try {
              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                throw new SparkException(s"Unsupported message $message from ${_sender}")
              })
            } catch {
              case NonFatal(e) =>
                context.sendFailure(e)
                // Throw the exception -- this exception will be caught by the safelyCall function.
                // The endpoint's onError function will be called.
                throw e
            }
          case OneWayMessage(_sender, content) =>
            endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
              throw new SparkException(s"Unsupported message $message from ${_sender}")
            })
          case OnStart =>
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }
          case OnStop =>
            val activeThreads = inbox.synchronized { inbox.numActiveThreads }
            assert(activeThreads == 1,
              s"There should be only a single active thread but found $activeThreads threads.")
            dispatcher.removeRpcEndpointRef(endpoint)
            endpoint.onStop()
            assert(isEmpty, "OnStop should be the last message")
          case RemoteProcessConnected(remoteAddress) =>
            endpoint.onConnected(remoteAddress)
          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)
          case RemoteProcessConnectionError(cause, remoteAddress) =>
            endpoint.onNetworkError(cause, remoteAddress)
        }
      }
      inbox.synchronized {
        // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
        // every time.
        if (!enableConcurrent && numActiveThreads != 1) {
          // If we are not the only one worker, exit
          numActiveThreads -= 1
          return
        }
        message = messages.poll()
        if (message == null) {
          numActiveThreads -= 1
          return
        }
      }
    }
  }
 
关注公众号
					低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 
							
								
								    上一篇
								    
								
								阿里云HBase全新发布X-Pack NoSQL数据库再上新台阶
一、八年双十一,造就国内最大最专业HBase技术团队 阿里巴巴集团早在2010开始研究并把HBase投入生产环境使用,从最初的淘宝历史交易记录,到蚂蚁安全风控数据存储。持续8年的投入,历经8年双十一锻炼。4个PMC,6个committer,造就了国内最大最专业的HBase技术团队,其中HBase内核中超过200+重要的feature是阿里贡献。集团内部超过万台的规模,单集群超过千台,全球领先。 二、HBase技术团队重磅发布X-Pack,NoSQL数据库再上新台阶 阿里云自从17年8月提供HBase云服务以来,到18年12月累计服务了上千大B客户,已经有上千个在线的集群。是阿里云增长最为快速的数据库服务,也是大B客户比例最高的云服务之一。并于6月6日全球第一个推出HBase 2.0,是HBase领域当之无愧的排头兵。 为了满足客户对数据库
 - 
							
								
								    下一篇
								    
								
								Spark Master启动源码分析
Spark Master启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769 Youtube 视频 Spark master启动源码分析: https://youtu.be/74q1nddoaiY BiliBili 视频 Spark master启动源码分析: https://www.bilibili.com/video/av37442271/ 启动 master 启动脚本 start-master.sh 加载配置文件 . "${SPARK_HOME}/sbin/spark-config.sh" . "${SPARK_HOME}/bin/load-spark-env.sh" 默认配置 SPARK_MASTER_PORT=7077 SPARK_MASTER_IP=`hostname` SPARK_MASTER_WEBUI_PORT=8080 CL...
 
相关文章
文章评论
共有0条评论来说两句吧...

			.png)
				
				
				
				
				
				
				
微信收款码
支付宝收款码