- 在hbase2.x的时候,按照之前的继承
BaseRegionObserver 是不起作用的,经过我的测试,这个类好像是被移除了,我使用的版本是2.1.1
-
新的实现可以查看接口Coprocessor来查看,我们来看一下
/**
* Base interface for the 4 coprocessors - MasterCoprocessor, RegionCoprocessor,
* RegionServerCoprocessor, and WALCoprocessor.
* Do NOT implement this interface directly. Unless an implementation implements one (or more) of
* the above mentioned 4 coprocessors, it'll fail to be loaded by any coprocessor host.
*
* Example:
* Building a coprocessor to observe Master operations.
* <pre>
* class MyMasterCoprocessor implements MasterCoprocessor {
* @Override
* public Optional<MasterObserver> getMasterObserver() {
* return new MyMasterObserver();
* }
* }
* class MyMasterObserver implements MasterObserver {
* ....
* }
* </pre>
* Building a Service which can be loaded by both Master and RegionServer
* <pre>
* class MyCoprocessorService implements MasterCoprocessor, RegionServerCoprocessor {
* @Override
* public Optional<Service> getServices() {
* return new ...;
* }
* }
*/
-
这是Coprocessor接口的介绍,非常清楚的介绍了协处理器的用法,那么我就简单的翻译一下下,自己的英语很渣的
有四个接口的父类是coprocessors,他们分别是MasterCoprocessor, RegionCoprocessor, RegionServerCoprocessor, WALCoprocessor.
不要去直接实现coprocessors接口,除非你的实现类实现了上述4个Observer
然后下面的就是一些具体的实现代码
- 经过之前的介绍,我们知道了Observer的作用就类似一个触发器,当指定操作也就是你实现的方法对应的操作被执行的时候,你定义的Observer逻辑就会去在HBase上去跑,以实现一些控制
- 当然上面的coprocessors的实现不知四个接口,还有一些其他实现类,比如
AccessController,这个提供数据访问和管理的基本授权检查等操作,还有一些自己可以去查看一下
-
多说无益,直接上 需求
t1表中有f1列族
向f1中插入name,然后自动计算出name的hash作为此行的password列,不可单独添加password
可删除name,顺便一起删除password,不可单独删除password
-
maven
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.7</version>
</dependency>
</dependencies>
-
直接上 代码
public class MyRegionCoprocessor implements RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.ofNullable(new MyRegionObserver());
}
private static class MyRegionObserver implements RegionObserver {
private static final byte[] FAMILY = Bytes.toBytes("f1");
private static final byte[] COL_NAME = Bytes.toBytes("name");
private static final byte[] COL_PASSWORD = Bytes.toBytes("password");
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
//如果插入的是password列,那么就报错
List<Cell> passwords = put.get(FAMILY, COL_PASSWORD);
if (passwords.size() != 0 || !passwords.isEmpty()){
throw new IllegalArgumentIOException("Password insertion is not allowed !!");
}
//如果添加的是name,那么就增加相对应的name的hash的password列
List<Cell> names = put.get(FAMILY, COL_NAME);
names.forEach(name -> {
int hash = Objects.hash(name);
put.addColumn(FAMILY,COL_PASSWORD,Bytes.toBytes(hash));
});
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
//阻止删除密码,不可单独删除密码
List<Cell> passwords = delete.get(FAMILY, COL_PASSWORD);
if (passwords.size() != 0 || !passwords.isEmpty()){
throw new IllegalArgumentIOException("Password insertion is not delete !!");
}
//到这就代表不是密码了,获取name,然后删除对应的password
List<Cell> names = delete.get(FAMILY, COL_NAME);
names.forEach(name -> {
delete.addColumns(FAMILY,COL_PASSWORD);
});
}
}
}
- 如果你对上面的
Optional类感到陌生,你就记住他是用来判断值是否是空的就可以,如果想进一步了解,你可以参考我的专辑Java8学习,希望能帮助到你
- 上面的代码逻辑还是比较清晰的,在外部类的重写的方法
getRegionObserver中返回了我们自己实现的Observer类,当然这个方法是不用必须重写的,当我们需要实现自己的Observer的时候必须重写getRegionObserver,而当我们实现自己的EndPoint的时候,就需要重写getEndpointObserver方法了,还有一个在RegionCoprocessor中的方法是getBulkLoadObserver,从名字我们可以看到是批加载用的
- 当我们实现好了自己的Observer的时候,我们就需要部署到HBase集群中去看看效果,在网上我并没有找到类似本地测试Observer的办法,这可真是麻烦.
-
开始部署,当然打成jar,我就不说了,打完后,部署到HBase的方式有两种,一种是通过配置文件的方式去部署,一种是通过动态的方式去部署,前者称为静态部署,后面即动态部署, 静态部署 我将简单介绍一下,如下,而自己使用的是动态部署的方法
在hbase-site.xml中配置静态部署
<property>
<name>hbase.coprocessor.region.classes</name> #这是配置region的协处理器,对应的就是:hbase.coprocessor.master.classes
<value>你的全类名</value> #如果想同时配置多个协处理器,可以用逗号分隔多个协处理器的类名
</property>
相关的配置
hbase.coprocessor.enabled:是否启用协处理器机制,默认true,即开启
hbase.coprocessor.user.enabled:即是否允许用户动态配置,如果为false,就只能在xml中配置了
hbase.coprocessor.wal.classes:即监控wal的
hbase.coprocessor.abortonerror:即如果个别协处理器启动失败整个hbase是否启动,默认是如果有的协处理器启动失败,那么hbase就不起了
需要注意的是
1.配置顺序决定了执行顺序
2. 所有协处理器是以系统级优先级加载的 (优先级一会儿会提到)
3. 重启集群起作用
-
动态配置
- 首先你如果使用我上面的代码测试,那么请
create 't1','f1'
- 然后将你的jar上传到集群,然后上传到hdfs,如果你使用我的代码测试,请直接将jar上传到
hdfs:///hbase.jar
- 然后
disable 't1'
- 然后使用特定命令在表上配置协处理器:
alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|''
- 再然后
enabled 't1'
- 然后
desc 't1',你就会看到
hbase(main):086:0> desc 't1'
Table t1 is ENABLED
#注意这里代表已经配置上去了,如果没有配置上的话,就是 : 紧接着COLUMN FAMILIES DESCRIPTION 字眼,并没有下面的说明
t1, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://hbase.jar|qidai.MyRegionCoprocessor|1001|'}
COLUMN FAMILIES DESCRIPTION
- 到了这一步其实就可以了,然后在t1表中尽情测试需求就好了,我测试的时候就很简单的测试了一下,如果你使用我的代码测试,如果有问题,请及时指正哈!!!如果你发现你的动态配置貌似没起作用,那么请重启一下对应的RegionServer,我出现了这个问题,估计是我的笔记本内存上限了,没反应过来??????
-
到这就该 说说优先级 了,如上如果配置了处理器,多出来的那一行我们分析一下
alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|'
alter '表名','coprocessor'=>'hdfs地址|你的全类名|优先级|传入给你实现类的参数'
这里的优先级是0是最高的,以整数来表示的,越小值越先被执行
-
好了到这之后,大概的一些问题已经解决了,那么怎么将Observer给 卸载 呢 ?
- 静态部署的话,那么删除对应的xml内容,然后重启HBase即可
- 动态部署的话,我们可以先
disable,然后使用alter 't1', METHOD => 'table_att_unset', NAME => 'coprocessor$1'来进行卸载,在最后面的coprocessor$1需要自己改一下,看看自己的处理器名是啥就写啥,然后在enable就行了
- 剩余的EndPoint的使用这篇暂时不介绍, 不过迟早会补上的 , 因为现在只对Avro了解一些, 而Protobuf 不太清楚, 恩..今天这篇涉及的只是Observer一些最基本的操作,是并没有涉及什么原理之类的, 所以在了解其原理之前这也是必备的知识, 如果本片文章对你有些许帮助那就点个赞吧哈哈