Spark1.6.0功能扩展——为HiveThriftServer2增加HA
前言
HiveThriftServer2是Spark基于HiveServer2实现的多Session管理的Thrift服务,提供对Hive的集中式管理服务。HiveThriftServer2作为Yarn上的Application,目前只支持yarn-client模式——即Driver运行在本地,ApplicationMaster运行在NodeManager所管理的Container中。yarn-client模式相较于yarn-cluster模式,在Driver和ApplicationMaster之间引入了额外的通信,因而服务的稳定性较低。
为了能够提高HiveThriftServer2的可用性,打算部署两个或者多个HiveThriftServer2实例,最终确定了选择HA的解决方案。网上有关HiveThriftServer2的HA实现,主要借助了HAProxy、Nginx等提供的反向代理和负载均衡功能实现。这种方案有个问题,那就是用户提交的执行SQL请求与HiveThriftServer2之间的连接一旦断了,反向代理服务器并不会主动将请求重定向到其他节点上,用户必须再次发出请求,这时才会与其他HiveThriftServer2建立连接。这种方案,究其根本更像是负载均衡,无法保证SQL请求不丢失、重连、Master/Slave切换等机制。
为了解决以上问题,我选择了第三种方案。
由于HiveThriftServer2本身继承自HiveServer2,所以HiveServer2自带的HA方案也能够支持HiveThriftServer2。对于HiveServer2自带的HA方案不熟悉的同学,可以百度一下,相关内容还是很多的。如果按照我的假设,就使用HiveServer2自带的HA方案的话,你会发现我的假设是错误的——HiveThriftServer2居然不支持HA。这是为什么呢?请读者务必保持平常心,我们来一起研究研究。
注意:我这里的Spark版本是1.6.0,Hive版本是1.2.1。
HiveServer2的HA分析
我从网上找到了一副能够有效展示HiveServer2的HA原理的图(具体来源无从考证)。
这幅图片很直观的为我们介绍了HiveServer2的HA架构。整个架构实际上围绕着ZooKeeper集群,利用ZooKeeper提供的创建节点、检索子节点等功能来实现。那么ZooKeeper的HA是如何实现的呢?让我们来进行源码分析吧。
HiveServer2本身是由Java语言开发,熟悉Java应用(如Tomcat、Spark的Master和Worker、Yarn的ResourceManager和NodeManager等)的同学应该知道,任何的Java应用必须要有一个main class。HiveServer2这个Thrift服务的main class就是HiveServer2类。HiveServer2的main方法如代码清单1所示。
代码清单1 HiveServer2的main方法
public static void main(String[] args) { HiveConf.setLoadHiveServer2Config(true); try { ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2"); ServerOptionsProcessorResponse oprocResponse = oproc.parse(args); // 省略无关代码 // Call the executor which will execute the appropriate command based on the parsed options oprocResponse.getServerOptionsExecutor().execute(); } catch (LogInitializationException e) { LOG.error("Error initializing log: " + e.getMessage(), e); System.exit(-1); } }
上边代码中首先创建了ServerOptionsProcessor对象并对参数进行解析,parse方法解析完参数返回了oprocResponse对象(类型为ServerOptionsProcessorResponse)。然后调用oprocResponse的 getServerOptionsExecutor方法得到的对象实际为 StartOptionExecutor。最后调用了 StartOptionExecutor的 execute方法。 StartOptionExecutor的实现见代码清单2。 代码清单2 StartOptionExecutor的实现
static class StartOptionExecutor implements ServerOptionsExecutor { @Override public void execute() { try { startHiveServer2(); } catch (Throwable t) { LOG.fatal("Error starting HiveServer2", t); System.exit(-1); } } }
从 代码清单2看到,StartOptionExecutor的execute方法实际调用了startHiveServer2方法,startHiveServer2方法中与HA相关的代码如下: if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { server.addServerInstanceToZooKeeper(hiveConf); }
可以看到调用了 HiveServer2的addServerInstanceToZooKeeper方法。这个addServerInstanceToZooKeeper的作用就是在指定的ZooKeeper集群上创建持久化的父节点作为HA的命名空间,并创建持久化的节点将HiveServer2的实例信息保存到节点上(addServerInstanceToZooKeeper方法的实现细节留给感兴趣的同学,自行阅读)。ZooKeeper集群如何指定?HA的命名空间又是什么?大家先记着这两个问题,最后在配置的时候,再告诉大家。 HiveThriftServer2为何不支持HiveServer2自带的HA?
使用jps命令查看 HiveThriftServer2的进程信息,如图。 def main(args: Array[String]) { val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { System.exit(-1) } // 省略无关代码 }
HiveServerServerOptionsProcessor的实现见代码清单4。 private[apache] class HiveServerServerOptionsProcessor(serverName: String) extends ServerOptionsProcessor(serverName) { def process(args: Array[String]): Boolean = { // A parse failure automatically triggers a system exit val response = super.parse(args) val executor = response.getServerOptionsExecutor() // return true if the parsed option was to start the service executor.isInstanceOf[StartOptionExecutor] } }
从 代码清单4看到,HiveServerServerOptionsProcessor继承了我们前文所说的ServerOptionsProcessor,并增加了process方法。process方法中调用了父类ServerOptionsProcessor的parse方法解析参数,并得到类型为ServerOptionsProcessorResponse的response,之后调用了response的getServerOptionsExecutor方法得到对象executor(实际类型为StartOptionExecutor),最后只是判断executor的类型是否是StartOptionExecutor。 为HiveThriftServer2添加HA功能
由于HiveServer2的startHiveServer2方法是静态私有方法,所以HiveThriftServer2不能够直接调用。为了使得HiveThriftServer2能够调用,只能采用反射来实现。在HiveThriftServer2的main方法中添加反射调用addServerInstanceToZooKeeper方法的代码见代码清单5。
代码清单5 反射调用addServerInstanceToZooKeeper方法
if (SparkSQLEnv.hiveContext.hiveconf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { val method = server.getClass.getSuperclass.getDeclaredMethod("addServerInstanceToZooKeeper", classOf[org.apache.hadoop.hive.conf.HiveConf]) method.setAccessible(true) method.invoke(server, SparkSQLEnv.hiveContext.hiveconf) }
至此,我们的改造完成。 配置
既然通过修改源码,HiveThriftServer2已经采用了HiveServer2的HA实现,所以就可以采用与HiveServer2相同的配置。在hive-site.xml文件中添加以下配置:
<property> <name>hive.server2.support.dynamic.service.discovery</name> <value>true</value> </property> <property> <name>hive.server2.zookeeper.namespace</name> <value>hiveserver2_zk</value> </property> <property> <name>hive.zookeeper.quorum</name> <value>zkNode1:2181,zkNode2:2181,zkNode3:2181</value> </property> <property> <name>hive.zookeeper.client.port</name> <value>2181</value> </property>
以上配置中,各个配置项的含义为: - hive.server2.zookeeper.namespace:HiveServer2注册到ZooKeeper集群时,需要的命名空间。实际上,第一个有此配置的HiveServer2实例将在ZooKeeper集群的根节点下创建以命名空间为名称的持久化节点。
- hive.server2.support.dynamic.service.discovery:是否开启HiveServer2的动态服务发现。开启此配置后,HiveServer2将向ZooKeeper集群的命名空间节点下创建服务的信息节点。
- hive.zookeeper.quorum:ZooKeeper集群的参与者列表。
- hive.zookeeper.client.port:ZooKeeper集群开放给客户端使用的端口。
测试
我们启动两个HiveThriftServer2实例,然后打开ZooKeeper客户端,就可以看到ZooKeeper集群的根节点下名称为hiveserver2_zk的持久化节点,如下图所示。
我们查看hiveserver2_zk节点下已经注册的服务,如下图所示。
使用beeline来测试,首先进入beeline,然后使用jdbc连接HiveThriftServer2,如下图所示。
通过jdbc连接时使用的jdbc URL的格式为:jdbc:hive2://<zookeeper quorum>/<dbName>;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk
在连接的过程中需要输入用户名、密码等信息。最终beeline会通过hive-jdbc从多个HiveThriftServer2实例中选择一个连接。
使用Java语言时,通过java jdbc也可以使用此HiveThriftServer2实例,只不过需要的jdbc driver为org.apache.hive.jdbc.HiveDriver。
功能扩展
通过ZooKeeper集群的服务发现,我们实现的HA实际跟HAProxy、Nginx等提供的负载均衡功能没有太多区别。如果发生网络超时、连接断开、执行失败等情况时,我们的客户端程序也会失败。为了在发生以上异常时能够进行重连、重试、选择其他服务进行重连,这都需要客户端代码去实现。由于实现方式多种多样,所以这里就不具体罗列,只将我个人实现的HiveThriftHAHelper类的各个关键功能进行介绍:
- init:从jdbc URL中解析出必要的参数,例如zookeeper quorum、serviceDiscoveryMode、zooKeeperNamespace等。
- getServerHosts:从ZooKeeper集群获取各个HiveThriftServer2实例的信息,并进行缓存。
- selectHost:从HiveThriftServer2实例中按照随机、轮询、Master/Slave等多种策略选择服务。
- execute:选择服务、执行SQL、异常重试等。
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,基于Spark2.1.0版本的《 Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Gartner:2017年将有过半大数据项目不成功
Gartner预测,2017年将有60%的大数据项目在试验阶段就会失败,并最终会被放弃。 在大数据正热的当下,这一结论无疑给众多的热心者泼了一道冷水。 随着企业努力在数字时代实现数据驱动,我们的生态系统正在发生重大变化。不光是企业应用程序生成的海量数据日增,在企业外部广泛的用户和难以数计连接的各种“事物”所产生的数据也呈指数级递增。这都导致企业围绕数据的洞察会变得越来越复杂。 我们不仅要问,企业在将数据资产链接到战略价值的这一过程中究竟出现了什么问题? 有专业人士认为,二者之间的主要障碍是缺乏技能或专业知识,以及技术战略与整体公司需求之间不匹配。 专业差距 我们都知道,大数据并非新近事物。早些年,当大数据处于起步阶段时,当时可用技术并不成熟。一些早期发展起来的知名网络公司,如谷歌、Facebook等不得不从根本上建立基础设施来处理相关问题。他们的成功也因此引来了更多的追随者,许多企业试图用自己基于Hadoop的大数据项目来效仿前者。 效仿的结果是,后者的IT和数据专业人员对Hadoop作为一个技术工具包能够做什么,以及对产生结果需要多少时间的预期出现偏差。Gartner的一项调查结果显...
- 下一篇
大数据技术如何创造更大价值
“大数据”这个词汇已经火了好几年,在最近的一两年里,风头似乎被人工智能、深度学习等概念抢走,逐渐成为“过气”的科技词汇。但事实上,我们认为这种炒作过后的“消亡”过程表明,大数据作为一个前沿技术在各个领域中开始了真正的应用。 2016年是大数据里程碑式的一年,不仅在很多行业中得到应用,辅助做出更有意义的决策,而且在可用性、备份和恢复等性能上变得对企业更加的友好、更易操作。我们也将持续关注那些使生产更高效,资源配置更合理,交易效率更快,从而提升生产者收益的优质项目。 在2017年,大数据技术将如何为企业和用户创造更大的价值? 大数据经过了几年的发展,在基础设施建设上已经取得了长足的进步,一些企业已经成为了上市公司(例如HortonWorks和NewRelic),还有一些例如Cloudera、MongoDB等公司融资也已经超过了一亿美元。在基础设施层面加强存储和处理大数据的能力之外,各个行业对大数据的应用才刚刚开始。 在过去的一年中,我们看到了大数据在金融科技、医疗、农业、企业服务等行业的深度应用,大数据分析使设备连接更快速、决策更聪明、运转更高效。那么在2017年,大数据又能呈现出哪些新的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作