附录代码:
HBase---->HDFS
1 import java.io.IOException;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.Path;
5 import org.apache.hadoop.hbase.HBaseConfiguration;
6 import org.apache.hadoop.hbase.client.Result;
7 import org.apache.hadoop.hbase.client.Scan;
8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10 import org.apache.hadoop.hbase.mapreduce.TableMapper;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16
17 public class HBase2HDFS {
18
19 public static void main(String[] args) throws Exception {
20 Configuration conf = HBaseConfiguration.create();
21 Job job = Job.getInstance(conf, HBase2HDFS.class.getSimpleName());
22 job.setJarByClass(HBase2HDFS.class);
23 //MR有输入和输出,输入一般是FileInputFormat等...但是在HBase中需要用到一个特殊的工具类是TableMapReduceUtil
24 TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), HBase2HDFSMapper.class,
25 Text.class, Text.class, job);
26 //HBase中的具体操作打到MR的job中.
27 TableMapReduceUtil.addDependencyJars(job);
28 job.setMapperClass(HBase2HDFSMapper.class);
29 job.setMapOutputKeyClass(Text.class);
30 job.setMapOutputValueClass(Text.class);
31 job.setOutputFormatClass(TextOutputFormat.class);
32 FileOutputFormat.setOutputPath(job, new Path(args[1]));
33 //FileOutputFormat.setOutputPath(job, new Path("/t1-out"));
34 job.setNumReduceTasks(0);
35 job.waitForCompletion(true);
36
37
38 }
39 static class HBase2HDFSMapper extends TableMapper<Text, Text>{
40 private Text rowKeyText = new Text();
41 private Text value = new Text();
42
43 //这个TableMapper中的两个泛型是Map阶段的输出..HBase中的数据要想进入HBase,几乎都用引号引起来.
44 //TableMapper是Mapper类的一个子类.这个类用来定义前面的两个泛型参数.
45 @Override
46 protected void map(
47 ImmutableBytesWritable key,
48 Result result,
49 Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
50 throws IOException, InterruptedException {
51 //结果都在result对象,用raw方法从result对象中找到数据. 这个raw()方法已经过时了.
52 /*
53 KeyValue[] raw = result.raw();
54 for (KeyValue keyValue : raw) {
55 keyValue.getValue();
56 }
57 */
58 /*
59 * 想输出的数据格式如下: 1 zhangsan 13 (行键,name,age)
60 * 2 lisi 14
61 */
62
63 //要想精确的获得某一列的值,要根据行键,列族,列的时间戳.
64 //getColumnLatestCell 是获得最新的时间戳的值 相当于时间戳已经定义好了.
65 byte[] nameBytes = result.getColumnLatestCell("cf".getBytes(), "name".getBytes()).getValue();
66 byte[] ageBytes = result.getColumnLatestCell("cf".getBytes(), "age".getBytes()).getValue();
67
68 rowKeyText.set(key.get());
69 value.set(new String(nameBytes) + "\t" + new String(ageBytes));
70 context.write(new Text(key.get()), value);
71 //这里已经把数据搞成了 1 name age 的形式....就不需要写Reduce
72 }
73 }
74 }
HDFS---->HBase 通过MR导入到HBase
1 import java.io.IOException;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.hbase.HBaseConfiguration;
5 import org.apache.hadoop.hbase.client.Mutation;
6 import org.apache.hadoop.hbase.client.Put;
7 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
9 import org.apache.hadoop.hbase.mapreduce.TableReducer;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.Mapper;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18
19 public class HDFS2HBaseImport {
20
21 public static void main(String[] args) throws Exception {
22 Configuration conf = HBaseConfiguration.create();
23 conf.set(TableOutputFormat.OUTPUT_TABLE, args[0]);
24
25 Job job = Job.getInstance(conf, HDFS2HBaseImport.class.getSimpleName());
26 job.setJarByClass(HDFS2HBaseImport.class);
27
28 //数据到底放到哪一张表中,还是要用到TableMapReduceUtil类.
29 TableMapReduceUtil.addDependencyJars(job);
30 job.setMapperClass(HDFS2HBaseMapper.class);
31 job.setMapOutputKeyClass(Text.class);
32 job.setMapOutputValueClass(Text.class);
33 job.setOutputFormatClass(TextOutputFormat.class);
34 job.setReducerClass(HDFS2HBaseReducer.class);
35 job.setOutputFormatClass(TableOutputFormat.class);
36 FileInputFormat.setInputPaths(job, args[1]);
37 job.waitForCompletion(true);
38 }
39
40 static class HDFS2HBaseMapper extends Mapper<LongWritable, Text, Text, Text>{
41 private Text rowKeyText = new Text();
42 private Text value = new Text();
43
44 @Override
45 protected void map(LongWritable key, Text text,
46 Mapper<LongWritable, Text, Text, Text>.Context context)
47 throws IOException, InterruptedException {
48 String[] splits = text.toString().split("\t");
49 rowKeyText.set(splits[0]);
50 value.set(splits[1] + "\t" + splits[2]);//name\tage
51 context.write(rowKeyText, value);
52 }
53 }
54 //Reduce继承的是和在导出的时候Map extends TableMapper 对应的 因为导入的是HBase中,所以后面的参数用NullWritable代替
55 static class HDFS2HBaseReducer extends TableReducer<Text, Text, NullWritable> {
56 @Override
57 protected void reduce(Text k2, Iterable<Text> v2s,
58 Reducer<Text, Text, NullWritable, Mutation>.Context context)
59 throws IOException, InterruptedException {
60 //向HBase中插入数据一定要用到Put对象.
61 Put put = new Put(k2.getBytes());
62
63 for (Text text : v2s) {
64 String[] splits = text.toString().split("\t");
65 //加载列和对应的值
66 put.add("cf".getBytes(), "name".getBytes(), splits[0].getBytes());
67 put.add("cf".getBytes(), "age".getBytes(), splits[1].getBytes());
68 context.write(NullWritable.get(), put);//一个参数是key,一个是对应的value.
69 //导入HBase不需要key...直接用NullWritable对象和封装好数据的put对象.
70 }
71 }
72 }
73 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/5583135.html,如需转载请自行联系原作者