Spark 如何写入HBase/Redis/MySQL/Kafka

一些概念

一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.
  • Partition 是一个可迭代数据集合
  • Task 本质是作用于Partition的线程

问题

Task 里如何使用Kafka Producer 将数据发送到Kafaka呢。 其他譬如HBase/Redis/MySQL 也是如此。

解决方案

直观的解决方案自然是能够在Executor(JVM)里有个Prodcuer Pool(或者共享单个Producer实例),但是我们的代码都是现在Driver端执行,然后将一些函数序列化到Executor端执行,这里就有序列化问题,正常如Pool,Connection都是无法序列化的。
一个简单的解决办法是定义个Object 类,
譬如
object SimpleHBaseClient {
  private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181"

  private lazy val (table, conn) = createConnection

  def bulk(items:Iterator) = {
      items.foreach(conn.put(_))
      conn.flush....
  } 
 ......
}
然后保证这个类在map,foreachRDD等函数下使用,譬如:
dstream.foreachRDD{ rdd =>
    rdd.foreachPartition{iter=>
        SimpleHBaseClient.bulk(iter)  
    }
}
为什么要保证放到foreachRDD /map 等这些函数里呢?Spark的机制是先将用户的程序作为一个单机运行(运行者是Driver),Driver通过序列化机制,将对应算子规定的函数发送到Executor进行执行。这里,foreachRDD/map 等函数都是会发送到Executor执行的,Driver端并不会执行。里面引用的object 类 会作为一个stub 被序列化过去,object内部属性的的初始化其实是在Executor端完成的,所以可以避过序列化的问题。
Pool也是类似的做法。然而我们并不建议使用pool,因为Spark 本身已经是分布式的,举个例子可能有100个executor,如果每个executor再搞10个connection的pool,则会有100*10 个链接,Kafka也受不了。一个Executor 维持一个connection就好。
关于Executor挂掉丢数据的问题,其实就看你什么时候flush,这是一个性能的权衡。
优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/60482

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
优质分享Android(本站安卓app)

优质分享Android(本站安卓app)

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。