您现在的位置是:首页 > 文章详情

Hbase-Observer

日期:2019-01-03点击:348
  • HBase的协处理器涵盖了两种类似关系型数据库中的应用场景:存储过程和触发器,所以协处理器也分为两种:用来实现存储过程功能的终端 程序EndPoint和用来实现触发器功能的观察者Observers

Observer

  • 在hbase2.x的时候,按照之前的继承BaseRegionObserver 是不起作用的,经过我的测试,这个类好像是被移除了,我使用的版本是2.1.1
  • 新的实现可以查看接口Coprocessor来查看,我们来看一下

    /** * Base interface for the 4 coprocessors - MasterCoprocessor, RegionCoprocessor, * RegionServerCoprocessor, and WALCoprocessor. * Do NOT implement this interface directly. Unless an implementation implements one (or more) of * the above mentioned 4 coprocessors, it'll fail to be loaded by any coprocessor host. * * Example: * Building a coprocessor to observe Master operations. * <pre> * class MyMasterCoprocessor implements MasterCoprocessor { * &#64;Override * public Optional&lt;MasterObserver> getMasterObserver() { * return new MyMasterObserver(); * } * } * class MyMasterObserver implements MasterObserver { * .... * } * </pre> * Building a Service which can be loaded by both Master and RegionServer * <pre> * class MyCoprocessorService implements MasterCoprocessor, RegionServerCoprocessor { * &#64;Override * public Optional&lt;Service> getServices() { * return new ...; * } * } */
  • 这是Coprocessor接口的介绍,非常清楚的介绍了协处理器的用法,那么我就简单的翻译一下下,自己的英语很渣的

    有四个接口的父类是coprocessors,他们分别是MasterCoprocessor, RegionCoprocessor, RegionServerCoprocessor, WALCoprocessor. 不要去直接实现coprocessors接口,除非你的实现类实现了上述4个Observer 然后下面的就是一些具体的实现代码
  • 经过之前的介绍,我们知道了Observer的作用就类似一个触发器,当指定操作也就是你实现的方法对应的操作被执行的时候,你定义的Observer逻辑就会去在HBase上去跑,以实现一些控制
  • 当然上面的coprocessors的实现不知四个接口,还有一些其他实现类,比如AccessController,这个提供数据访问和管理的基本授权检查等操作,还有一些自己可以去查看一下
  • 多说无益,直接上 需求

     t1表中有f1列族 向f1中插入name,然后自动计算出name的hash作为此行的password列,不可单独添加password 可删除name,顺便一起删除password,不可单独删除password
  • maven

    <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.7</version> </dependency> </dependencies>
  • 直接上 代码

    public class MyRegionCoprocessor implements RegionCoprocessor { @Override public Optional<RegionObserver> getRegionObserver() { return Optional.ofNullable(new MyRegionObserver()); } private static class MyRegionObserver implements RegionObserver { private static final byte[] FAMILY = Bytes.toBytes("f1"); private static final byte[] COL_NAME = Bytes.toBytes("name"); private static final byte[] COL_PASSWORD = Bytes.toBytes("password"); @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException { //如果插入的是password列,那么就报错 List<Cell> passwords = put.get(FAMILY, COL_PASSWORD); if (passwords.size() != 0 || !passwords.isEmpty()){ throw new IllegalArgumentIOException("Password insertion is not allowed !!"); } //如果添加的是name,那么就增加相对应的name的hash的password列 List<Cell> names = put.get(FAMILY, COL_NAME); names.forEach(name -> { int hash = Objects.hash(name); put.addColumn(FAMILY,COL_PASSWORD,Bytes.toBytes(hash)); }); } @Override public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException { //阻止删除密码,不可单独删除密码 List<Cell> passwords = delete.get(FAMILY, COL_PASSWORD); if (passwords.size() != 0 || !passwords.isEmpty()){ throw new IllegalArgumentIOException("Password insertion is not delete !!"); } //到这就代表不是密码了,获取name,然后删除对应的password List<Cell> names = delete.get(FAMILY, COL_NAME); names.forEach(name -> { delete.addColumns(FAMILY,COL_PASSWORD); }); } } }
  • 如果你对上面的Optional类感到陌生,你就记住他是用来判断值是否是空的就可以,如果想进一步了解,你可以参考我的专辑Java8学习,希望能帮助到你
  • 上面的代码逻辑还是比较清晰的,在外部类的重写的方法getRegionObserver中返回了我们自己实现的Observer类,当然这个方法是不用必须重写的,当我们需要实现自己的Observer的时候必须重写getRegionObserver,而当我们实现自己的EndPoint的时候,就需要重写getEndpointObserver方法了,还有一个在RegionCoprocessor中的方法是getBulkLoadObserver,从名字我们可以看到是批加载用的
  • 当我们实现好了自己的Observer的时候,我们就需要部署到HBase集群中去看看效果,在网上我并没有找到类似本地测试Observer的办法,这可真是麻烦.
  • 开始部署,当然打成jar,我就不说了,打完后,部署到HBase的方式有两种,一种是通过配置文件的方式去部署,一种是通过动态的方式去部署,前者称为静态部署,后面即动态部署, 静态部署 我将简单介绍一下,如下,而自己使用的是动态部署的方法

    在hbase-site.xml中配置静态部署 <property> <name>hbase.coprocessor.region.classes</name> #这是配置region的协处理器,对应的就是:hbase.coprocessor.master.classes <value>你的全类名</value> #如果想同时配置多个协处理器,可以用逗号分隔多个协处理器的类名 </property> 相关的配置 hbase.coprocessor.enabled:是否启用协处理器机制,默认true,即开启 hbase.coprocessor.user.enabled:即是否允许用户动态配置,如果为false,就只能在xml中配置了 hbase.coprocessor.wal.classes:即监控wal的 hbase.coprocessor.abortonerror:即如果个别协处理器启动失败整个hbase是否启动,默认是如果有的协处理器启动失败,那么hbase就不起了 需要注意的是 1.配置顺序决定了执行顺序 2. 所有协处理器是以系统级优先级加载的 (优先级一会儿会提到) 3. 重启集群起作用
  • 动态配置

    • 首先你如果使用我上面的代码测试,那么请create 't1','f1'
    • 然后将你的jar上传到集群,然后上传到hdfs,如果你使用我的代码测试,请直接将jar上传到hdfs:///hbase.jar
    • 然后disable 't1'
    • 然后使用特定命令在表上配置协处理器:alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|''
    • 再然后enabled 't1'
    • 然后desc 't1',你就会看到
    hbase(main):086:0> desc 't1' Table t1 is ENABLED #注意这里代表已经配置上去了,如果没有配置上的话,就是 : 紧接着COLUMN FAMILIES DESCRIPTION 字眼,并没有下面的说明 t1, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://hbase.jar|qidai.MyRegionCoprocessor|1001|'} COLUMN FAMILIES DESCRIPTION 
    • 到了这一步其实就可以了,然后在t1表中尽情测试需求就好了,我测试的时候就很简单的测试了一下,如果你使用我的代码测试,如果有问题,请及时指正哈!!!如果你发现你的动态配置貌似没起作用,那么请重启一下对应的RegionServer,我出现了这个问题,估计是我的笔记本内存上限了,没反应过来??????
  • 到这就该 说说优先级 了,如上如果配置了处理器,多出来的那一行我们分析一下

    alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|' alter '表名','coprocessor'=>'hdfs地址|你的全类名|优先级|传入给你实现类的参数' 这里的优先级是0是最高的,以整数来表示的,越小值越先被执行
  • 好了到这之后,大概的一些问题已经解决了,那么怎么将Observer给 卸载 呢 ?

    • 静态部署的话,那么删除对应的xml内容,然后重启HBase即可
    • 动态部署的话,我们可以先disable,然后使用alter 't1', METHOD => 'table_att_unset', NAME => 'coprocessor$1'来进行卸载,在最后面的coprocessor$1需要自己改一下,看看自己的处理器名是啥就写啥,然后在enable就行了
  • 剩余的EndPoint的使用这篇暂时不介绍, 不过迟早会补上的 , 因为现在只对Avro了解一些, 而Protobuf 不太清楚, 恩..今天这篇涉及的只是Observer一些最基本的操作,是并没有涉及什么原理之类的, 所以在了解其原理之前这也是必备的知识, 如果本片文章对你有些许帮助那就点个赞吧哈哈
原文链接:https://yq.aliyun.com/articles/684392
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章