Hadoop学习笔记(四):HBase
HBase是在一个HDFS上开发的面向列的分布式数据库。HBase不是关系型数据库,不支持SQL。 HTable一些基本概念 Row key 行主键, HBase不支持条件查询和Order by等查询,读取记录只能按Row key(及其range)或全表扫描,因此Row key需要根据业务来设计以利用其存储排序特性(Table按Row key字典序排序如1,10,100,11,2)提高性能。 Column Family(列族) 在表创建时声明,每个Column Family为一个存储单元。在上例中设计了一个HBase表blog,该表有两个列族:article和author。 Column(列) HBase的每个列都属于一个列族,以列族名为前缀,如列article:title和article:content属于article列族,author:name和author:nickname属于author列族。Column不用创建表时定义即可以动态新增,同一Column Family的Columns会群聚在一个存储单元上,并依Column key排序,因此设计时应将具有相同I/O特性的Column设计在一个Column Family上以提高性能。 Timestamp HBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,查询时默认返回最新版本。如上例中row key=1的author:nickname值有两个版本,分别为1317180070811对应的“一叶渡江”和1317180718830对应的“yedu”(对应到实际业务可以理解为在某时刻修改了nickname为yedu,但旧值仍然存在)。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。 Value 每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp=>value,例如上例中{tableName=’blog’,RowKey=’1’,ColumnName=’author:nickname’,Timestamp=’ 1317180718830’}索引到的唯一值是“yedu”。 存储类型 TableName 是字符串RowKey 和 ColumnName 是二进制值(Java 类型 byte[])Timestamp 是一个 64 位整数(Java 类型 long)value 是一个字节数组(Java类型 byte[])。 SHELL操作 HBase提供了丰富的访问接口,其中HBase Shell是常用的便捷方式。 • HBase Shell • Java clietn API • Jython、Groovy DSL、Scala • REST • Thrift(Ruby、Python、Perl、C++…) • MapReduce • Hive/Pig 创建表> create 'test','data1','data2' Column Family是schema的一部分,而Column不是。这里的data1和data2是Column Family。 增加记录>put 'test','1','data1:name','luc'>put 'test','1','data1:age','24'>put 'test','1','data2:height','170cm'>put 'test','1','data2:weight','65kg'>put 'test','1','data1:nickname','vichao'Column完全动态扩展,每行可以有不同的Columns。(ps:好像有点胖啊,要减肥~~) 根据RowKey查询> get 'test','1' HTable按RowKey字典序(1,10,100,11,2)自动排序,每行包含任意数量的Columns,Columns按ColumnKey(data1:age,data1:name,data1:nickname,data2:height,data2:weight)自动排序。 更新操作 查询值: >get 'test','1','data1:nickname' 更新nickname为'vic': >put 'test','1','data1:nickname','vic' 查询更新后的结果:(返回的将是vic) > get ‘blog’,’1’,’data1:nickname’ 知识点回顾:查询默认返回最近的值。 查询nickname的多个(本示例为2个)版本值 > get 'test','1',{COLUMN => 'data1:nickname',VERSIONS => 2}知识点回顾:每个Column可以有任意数量的Values,按Timestamp倒序自动排序。 如何只查询到以前的旧版本呢,需要借助Timestamp >get 'test','1',{COLUMN=>'data1:nickname',TIMESTAMP=>1373707746997} 知识点回顾:TabelName+RowKey+Column+Timestamp=>Value 删除记录 delete只能删除一个column >delete 'test','1','data1:nickname' 删除RowKey的所有column用deleteall >deleteall 'test','1' 删除表练习完毕,把练习表删了吧,删除之前需要先disable>disable 'test'>drop 'test' JAVA API操作 javaAPI操作还是比较简单的,各种api类和api函数,直接上代码了 import java.io.IOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.conf.Configuration; public class ExampleClient { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); // Create table HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor htd = new HTableDescriptor("test"); HColumnDescriptor hcd = new HColumnDescriptor("data"); htd.addFamily(hcd); admin.createTable(htd); byte[] tablename = htd.getName(); HTableDescriptor[] tables = admin.listTables(); if (tables.length != 1 && Bytes.equals(tablename, tables[0].getName())) { throw new IOException("Failed create of table!"); } // Run some operations HTable table = new HTable(conf, tablename); byte[] row1 = Bytes.toBytes("row1"); Put p1 = new Put(row1); byte[] databytes = Bytes.toBytes("data"); p1.add(databytes, Bytes.toBytes("1"), Bytes.toBytes("value1")); table.put(p1); Get g = new Get(row1); Result result = table.get(g); System.out.println("GET:" + result); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); try { for (Result scannerResult : scanner) { System.out.println("Scan:" + scannerResult); } } catch (Exception e) { e.printStackTrace(); } finally { scanner.close(); } // Drop the table admin.disableTable(tablename); admin.deleteTable(tablename); } } 这里要注意的一点是连接到HBase需要将将HBase环境的hbase-site.xml文件引入到工程中,就像jdbc的数据库连接一样,不然是连不上hbase滴。 Configuration conf = HBaseConfiguration.create();//这行代码会从hbase-site.xml中读取配置信息 HBase MapReduce操作 直接上代码了: import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; public class testHBaseMR { public static class Mapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { public Mapper() { } @Override public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { ImmutableBytesWritable value = null; String[] tags = null; for (KeyValue kv : values.list()) { if ("author".equals(Bytes.toString(kv.getFamily())) && "nickname".equals(Bytes.toString(kv.getQualifier()))) { value = new ImmutableBytesWritable(kv.getValue()); } if ("article".equals(Bytes.toString(kv.getFamily())) && "tags".equals(Bytes.toString(kv.getQualifier()))) { tags = Bytes.toString(kv.getValue()).split(","); } } for (int i = 0; i < tags.length; i++) { ImmutableBytesWritable key = new ImmutableBytesWritable( Bytes.toBytes(tags[i].toLowerCase())); try { context.write(key, value); } catch (InterruptedException e) { throw new IOException(e); } } } } public static class Reducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> { @Override public void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { String friends = ""; for (ImmutableBytesWritable val : values) { friends += (friends.length() > 0 ? "," : "") + Bytes.toString(val.get()); } Put put = new Put(key.get()); put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"), Bytes.toBytes(friends)); context.write(key, put); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); Job job = new Job(conf, "HBase_FindFriend"); job.setJarByClass(testHBaseMR.class); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname")); scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags")); TableMapReduceUtil .initTableMapperJob("blog", scan, Mapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); TableMapReduceUtil .initTableReducerJob("tag_friend", Reducer.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } } TableMapReduceUtil.initTableMapperJob;TableMapReduceUtil.initTableReducerJob 用来指定map和reduce。相当于hbase的MR操作工具类吧~ 优化 针对行的键按数据排列的次序进行随机处理; 每个任务只实例化一个HTable对象; HTable.put(put)执行put操作时不使用任何缓冲。可以通过使用HTable.setAutoFlush(false),设置禁用自动刷入,并设置写缓冲大小,同时在任务的最后设置HTable.flushCommits()或者HTable.close(),以确保缓冲中最后没有剩下的未刷入的数据; 设计行键的时候要多加考虑,可以使用复合键,如果键是整数,则应该使用二进制形式以节省存储空间; HBase和RDBMS的比较 HBase: HBase是一个分布式的面向列的数据存储系统; HBase表可以很高和很宽(数十亿行,数百万列); 水平分区并在数千个商用机节点上自动复制; 表的模式是物理存储的直接反映; 不支持SQL; 不需要强一致性和参照完整性; 不支持索引;行是顺序存储的,每行中的列也是,不存在索引膨胀的问题,插入性能和表的大小无关 事务好像仅仅支持针对某一行的一系列Put/Delete操作。不同行、不同表间的操作是无法放在一个事务中的; HBase不支持条件查询和Order by等查询,读取记录只能按Row key(及其range)或全表扫描; 没有内置对连接操作的支持,但是由于表的宽度可以很大,一个宽行可以容下一个主键相关的所有数据,并不需要使用连接; RDBMS: 支持SQL; 需要强一致性和参照完整性; 支持事务,索引等; HBase的特性 没有索引,行是顺序存储的,每行中的列也是,不存在索引膨胀的问题,插入性能和表的大小无关; 自动分区,在表增长的时候,表会自动分裂成区域,并分布到可用的节点上; 线性扩展和对于新节点的自动处理,增加一个节点,把它指向现有的集群,并运行Regionserver,区域会自动重新进行平衡,负载会均匀分布; 普通商用硬件支持; 容错,大量节点意味着每个节点的重要性并不突出,不用担心单个节点失效; 批处理,支持MapReduce操作,可以用全并行的分布式作业来处理数据;