您现在的位置是:首页 > 文章详情

Flink操作Hbase

日期:2019-07-18点击:545

现在有这样一个场景,我们需要将hbase做成一个数据流,而不是数据集。根据Flink自带的Flink-Hbase只能帮我们做到数据集,所以这个时候选择了重写Hbase的数据源。

 package com.yjp.flink.demo11; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.shaded.org.joda.time.DateTime; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * 以Hbase为数据源 * 从Hbase中获取数据,然后以流的形式发射 * Date : 9:50 2018/3/12 */ public class HbaseSource implements SourceFunction<String> { private static Logger loggerFactory = LoggerFactory.getLogger(HbaseSource.class); private static final long serialVersionUID = 1; private volatile boolean isRunning = true; /** * 开始的时间戳 */ private long startTime; /** * 每次查询多长时间的数据 */ private long interval; /** * 需要查询的列名 */ private ArrayList<String> columns; /** * 需要查询的表名 */ private String tableName; public HbaseSource(long startTime, long interval, ArrayList<String> columns, String tableName) { this.startTime = startTime; this.interval = interval; this.columns = columns; this.tableName = tableName; } public HbaseSource() { } @Override public void run(SourceContext<String> out) { if (isRunning) { long endTime = DateTime.now().getMillis() - interval; ResultScanner rs = new HbaseSource().getHbaseData(tableName, startTime, endTime - startTime, columns); new HbaseSource().transmitData(rs, out); startTime = endTime; } while (isRunning) { ResultScanner rs = new HbaseSource().getHbaseData(tableName, startTime, interval, columns); new HbaseSource().transmitData(rs, out); startTime += interval; try { Thread.sleep(interval); } catch (InterruptedException e) { throw new RuntimeException("休眠异常", e); } } } @Override public void cancel() { } /** * 获取数据集 * * @param startTime 时间戳开始的时间 * @param interval 间隔时间 * @return 对应的结果集 */ private ResultScanner getHbaseData(String tableName, long startTime, long interval, List<String> columns) { Configuration conf = HBaseConfiguration.create(); HTable table; Scan scan; try { table = new HTable(conf, tableName); scan = new Scan(); scan.setTimeRange(startTime, startTime + interval); for (String column : columns) { String[] columnName = column.split(":"); scan.addColumn(Bytes.toBytes(columnName[0]), Bytes.toBytes(columnName[1])); } return table.getScanner(scan); } catch (IOException e) { throw new RuntimeException("读取数据异常", e); } } private void transmitData(ResultScanner rs, SourceContext<String> out) { Result result; try { while ((result = rs.next()) != null && isRunning) { KeyValue[] kvs = result.raw(); for (KeyValue kv : kvs) { String value = new String(kv.getValue()); out.collect(value); } } } catch (IOException e) { throw new RuntimeException("结果集遍历异常", e); } } } 然后将数据结果加工和处理存入Hbase中 package com.yjp.flink.hbase; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; public class HbaseToHbase { public static Logger logger = LoggerFactory.getLogger(HbaseToHbase.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv); sEnv.getConfig().disableSysoutLogging(); List<String> getColumns = new ArrayList<String>(3); getColumns.add("cf1_name"); getColumns.add("cf2_amount"); getColumns.add("cf3_groupId"); List<String> columnFamily = new ArrayList<>(3); columnFamily.add("cf1"); columnFamily.add("cf2"); columnFamily.add("cf3"); List<String> setColumns = new ArrayList<>(3); setColumns.add("cf2:result"); DataStreamSource<Orders> orderDataStream = sEnv.addSource(new HbaseStreamDataSource("Orders", 0L, 2000L, getColumns, Orders.class)); DataStream<Tuple3<String, Double, Integer>> dataStream = orderDataStream.flatMap( new FlatMapFunction<Orders, Tuple3<String, Double, Integer>>() { @Override public void flatMap(Orders value, Collector<Tuple3<String, Double, Integer>> out) throws Exception { out.collect(new Tuple3<String, Double, Integer>(value.getCf1_name(), value.getCf2_amount(), value.getCf3_groupId())); } }); dataStream.keyBy(2).sum(1).addSink( new SinkHbase<Tuple3<String, Double, Integer>>( "OrderResult", columnFamily, setColumns, "result")); sEnv.execute("test Hbase"); } } package com.yjp.flink.hbase; import org.apache.flink.api.java.tuple.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Table; import java.lang.reflect.Method; import java.util.*; /** * 自定义Sink * * Date : 17:23 2018/3/12 */ public class SinkHbase<T> extends RichSinkFunction<T> { private static final long serialVersionUID = 1L; /** * 表名 */ private String tableName; /** * 列族名 */ private List<String> columnFails; /** * 列名 以 family:column的形式传入 column与tuple中的值一一对应 */ private List<String> columns; /** * 行名 */ private String rowKey; /** * @param tableName 表名 * @param columnFamily 列族名 当表存在时不用输入 * @param columns 储存的列名 列族:列名 * @param rowKey 传入的行名 */ public SinkHbase(String tableName, List<String> columnFamily, List<String> columns, String rowKey) { this.tableName = tableName; this.columnFails = columnFamily; this.columns = columns; this.rowKey = rowKey; } /** * @param tableName 表名 * @param columns 储存的列名 列族:列名 * @param rowKey 传入的行名 */ public SinkHbase(String tableName, List<String> columns, String rowKey) { this.tableName = tableName; this.columns = columns; this.rowKey = rowKey; } public SinkHbase() { } /** * 初始化完成连接 当表不存在的时候 新建表和family列 * * @param parameters 调用父类的方法 * @throws Exception 创建连接失败 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Admin admin = FactoryConnect.getConnection().getAdmin(); final TableName tableName1 = TableName.valueOf(tableName); if (!admin.tableExists(tableName1)) { HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName1); for (String columnFamily : columnFails) { hTableDescriptor.addFamily(new HColumnDescriptor(columnFamily)); } admin.createTable(hTableDescriptor); } } /** * 执行方法 将数据存入hbase * * @param value 传入的结果 */ @Override public void invoke(T value, Context context) throws Exception { Map<Class, Method> map = new HashMap<>(25); new SinkHbase<T>().initMap(map); Table table = FactoryConnect.getConnection().getTable(TableName.valueOf(tableName)); Set<Class> keys = map.keySet(); for (Class key : keys) { if (value.getClass() == key) { map.get(key).invoke(new AssignmentTuple(), value, rowKey, columns, table); return; } } } private void initMap(Map<Class, Method> map) { try { map.put(Tuple1.class, AssignmentTuple.class.getMethod("setTuple1", Tuple1.class, String.class, ArrayList.class, Table.class)); map.put(Tuple2.class, AssignmentTuple.class.getMethod("setTuple2", Tuple2.class, String.class, ArrayList.class, Table.class)); map.put(Tuple3.class, AssignmentTuple.class.getMethod("setTuple3", Tuple3.class, String.class, ArrayList.class, Table.class)); map.put(Tuple4.class, AssignmentTuple.class.getMethod("setTuple4", Tuple4.class, String.class, ArrayList.class, Table.class)); map.put(Tuple5.class, AssignmentTuple.class.getMethod("setTuple5", Tuple5.class, String.class, ArrayList.class, Table.class)); map.put(Tuple6.class, AssignmentTuple.class.getMethod("setTuple6", Tuple6.class, String.class, ArrayList.class, Table.class)); map.put(Tuple7.class, AssignmentTuple.class.getMethod("setTuple7", Tuple7.class, String.class, ArrayList.class, Table.class)); map.put(Tuple8.class, AssignmentTuple.class.getMethod("setTuple8", Tuple8.class, String.class, ArrayList.class, Table.class)); map.put(Tuple9.class, AssignmentTuple.class.getMethod("setTuple9", Tuple9.class, String.class, ArrayList.class, Table.class)); map.put(Tuple10.class, AssignmentTuple.class.getMethod("setTuple10", Tuple10.class, String.class, ArrayList.class, Table.class)); map.put(Tuple11.class, AssignmentTuple.class.getMethod("setTuple11", Tuple11.class, String.class, ArrayList.class, Table.class)); map.put(Tuple12.class, AssignmentTuple.class.getMethod("setTuple12", Tuple12.class, String.class, ArrayList.class, Table.class)); map.put(Tuple13.class, AssignmentTuple.class.getMethod("setTuple13", Tuple13.class, String.class, ArrayList.class, Table.class)); map.put(Tuple14.class, AssignmentTuple.class.getMethod("setTuple14", Tuple14.class, String.class, ArrayList.class, Table.class)); map.put(Tuple15.class, AssignmentTuple.class.getMethod("setTuple15", Tuple15.class, String.class, ArrayList.class, Table.class)); map.put(Tuple16.class, AssignmentTuple.class.getMethod("setTuple16", Tuple16.class, String.class, ArrayList.class, Table.class)); map.put(Tuple17.class, AssignmentTuple.class.getMethod("setTuple17", Tuple17.class, String.class, ArrayList.class, Table.class)); map.put(Tuple18.class, AssignmentTuple.class.getMethod("setTuple18", Tuple18.class, String.class, ArrayList.class, Table.class)); map.put(Tuple19.class, AssignmentTuple.class.getMethod("setTuple19", Tuple19.class, String.class, ArrayList.class, Table.class)); map.put(Tuple20.class, AssignmentTuple.class.getMethod("setTuple20", Tuple20.class, String.class, ArrayList.class, Table.class)); map.put(Tuple21.class, AssignmentTuple.class.getMethod("setTuple21", Tuple21.class, String.class, ArrayList.class, Table.class)); map.put(Tuple22.class, AssignmentTuple.class.getMethod("setTuple22", Tuple22.class, String.class, ArrayList.class, Table.class)); map.put(Tuple23.class, AssignmentTuple.class.getMethod("setTuple23", Tuple23.class, String.class, ArrayList.class, Table.class)); map.put(Tuple24.class, AssignmentTuple.class.getMethod("setTuple24", Tuple24.class, String.class, ArrayList.class, Table.class)); map.put(Tuple25.class, AssignmentTuple.class.getMethod("setTuple25", Tuple25.class, String.class, ArrayList.class, Table.class)); } catch (NoSuchMethodException e) { throw new RuntimeException("反射失败", e); } } } package com.yjp.flink.hbase; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import java.io.IOException; import java.io.Serializable; /** * 单例模式 安全的拿到连接 * * Date : 16:45 2018/3/16 */ public class FactoryConnect implements Serializable { private static volatile Connection connection; private FactoryConnect() { } public static Connection getConnection() throws IOException { if (null == connection) { synchronized (FactoryConnect.class) { try { if (null == connection) { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(conf); } } catch (Exception e) { System.err.println("读取配置文件异常"); } } } return connection; } } package com.yjp.flink.hbase; import org.apache.flink.api.java.tuple.*; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.List; /** * 将tuple中的存放在Hbase中 * * Date : 16:49 2018/3/12 */ public class AssignmentTuple { /** * tuple 为1 * * @param tuple1 传入tuple的值 * @param rowKey 传入的rowkey的值 * @param columns 需要赋值的列 * @param table put的table对象 */ public void setTuple1(Tuple1<Object> tuple1, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple1, rowKey, columns, table); } public void setTuple2(Tuple2<Object, Object> tuple2, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple2, rowKey, columns, table); } public void setTuple3(Tuple3<Object, Object, Object> tuple3, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple3, rowKey, columns, table); } public void setTuple4(Tuple4<Object, Object, Object, Object> tuple4, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple4, rowKey, columns, table); } public void setTuple5(Tuple5<Object, Object, Object, Object, Object> tuple5, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple5, rowKey, columns, table); } public void setTuple6(Tuple6 tuple6, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple6, rowKey, columns, table); } public void setTuple7(Tuple7 tuple7, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple7, rowKey, columns, table); } public void setTuple8(Tuple8 tuple8, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple8, rowKey, columns, table); } public void setTuple9(Tuple9 tuple9, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple9, rowKey, columns, table); } public void setTuple10(Tuple10 tuple10, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple10, rowKey, columns, table); } public void setTuple11(Tuple11 tuple11, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple11, rowKey, columns, table); } public void setTuple12(Tuple12 tuple12, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple12, rowKey, columns, table); } public void setTuple13(Tuple13 tuple13, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple13, rowKey, columns, table); } public void setTuple14(Tuple14 tuple14, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple14, rowKey, columns, table); } public void setTuple15(Tuple15 tuple15, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple15, rowKey, columns, table); } public void setTuple16(Tuple16 tuple16, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple16, rowKey, columns, table); } public void setTuple17(Tuple17 tuple17, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple17, rowKey, columns, table); } public void setTuple18(Tuple18 tuple18, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple18, rowKey, columns, table); } public void setTuple19(Tuple19 tuple19, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple19, rowKey, columns, table); } public void setTuple20(Tuple20 tuple20, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple20, rowKey, columns, table); } public void setTuple21(Tuple21 tuple21, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple21, rowKey, columns, table); } public void setTuple22(Tuple22 tuple22, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple22, rowKey, columns, table); } public void setTuple23(Tuple23 tuple23, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple23, rowKey, columns, table); } public void setTuple24(Tuple24 tuple24, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple24, rowKey, columns, table); } public void setTuple25(Tuple25 tuple25, String rowKey, ArrayList<String> columns, Table table) { new AssignmentTuple().putData(tuple25, rowKey, columns, table); } /** * 将tuple中的数据一一对应的赋值给列 * * @param tuple tuple中的数据 * @param rowKey 设置的行值 * @param columns 对应的列名 * @param table 对应的table对象 */ public void putData(Tuple tuple, String rowKey, List<String> columns, Table table) { Put put = new Put(Bytes.toBytes(rowKey)); Long timeStamp = Instant.now().toEpochMilli(); for (int i = 0; i < columns.size(); i++) { String[] split = columns.get(i).split(":"); put.addColumn(Bytes.toBytes(split[0]), Bytes.toBytes(split[1]), timeStamp, Bytes.toBytes(tuple.getField(i).toString())); } try { table.put(put); } catch (IOException e) { throw new RuntimeException("存放失败", e); } } } 

为了做到一个通用的数据源和数据存储,于是采用了反射的方法。

原文链接:https://yq.aliyun.com/articles/710003
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章