hbase 报文 处理 逻辑
报文格式:
每隔一个小时 出现一个文件类型 报文 ,所以 我们的处理思路是,一个小时做一次处理。
import java.io.FileInputStream; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.cmcc.aoi.util.OsUtil; public class HbaseStarter { public static void main(String[] args) throws Exception { Properties properties=new Properties(); String config=""; if(!OsUtil.isLinux()) config= "D:/work/util/aoi-hbase/trunk/src/main/resources/config.properties"; else config = "/home/aoi/aoi-hbase/conf/config.properties"; FileInputStream fis = new FileInputStream(config); properties.load(fis); fis.close(); String hbaseTable = properties.getProperty("com.cmcc.aoi.ua.hbaseTable"); String hbaseFamily = properties.getProperty("com.cmcc.aoi.ua.hbaseFamily"); String sourceFilePath=properties.getProperty("com.cmcc.aoi.ua.sourceFilePath"); String archivelogsPath=properties.getProperty("com.cmcc.aoi.ua.archivelogsPath"); boolean isDebug= Integer.parseInt( properties.getProperty("com.cmcc.aoi.ua.isDebug")) == 0 ? false : true; String sourceFileName = properties.getProperty("com.cmcc.aoi.ua.sourceFileName"); String[] hbaseTableName=hbaseTable.split(","); // table String[] hbaseFamilyName=hbaseFamily.split("&");// family String[] sourceFileNameArr=sourceFileName.split(","); ScheduledExecutorService service = Executors.newScheduledThreadPool(2); service.scheduleAtFixedRate(new DeviceReadThread (sourceFileNameArr[0],hbaseTableName[0],hbaseFamilyName[0].split(","),sourceFilePath,archivelogsPath,isDebug) ,0, 1,TimeUnit.HOURS); service.scheduleAtFixedRate(new AppReadThread (sourceFileNameArr[1],hbaseTableName[1],hbaseFamilyName[1].split(","),sourceFilePath,archivelogsPath,isDebug) ,0, 1,TimeUnit.HOURS); } }
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import Model.Device; import com.alibaba.fastjson.JSON; public class DeviceReadThread extends BaseRunnabler { static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class); public DeviceReadThread(String sourceFileName, String hbaseTable, String[] hbaseFamily, String sourceFilePath, String archivelogsPath, boolean isDebug) { super(sourceFileName, hbaseTable, hbaseFamily, sourceFilePath, archivelogsPath, isDebug); } public void processFile(IOperator hu) { FileReader logReader = null; BufferedReader logBufferedReader = null; try { File logFile = new File(sourceFilePath+sourceFileName); logReader = new FileReader(logFile); logBufferedReader = new BufferedReader(logReader); String temp = logBufferedReader.readLine(); //logger.error(" temp is " + temp ); while ( temp != null) { Device device = JSON.parseObject(temp, Device.class); //logger.error(" device is null ? " + ( device == null ) ); String[][] s = new String[][] { { device.getLid(), hbaseFamily[0], "lid" , device.getLid() } , { device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, { device.getLid(), hbaseFamily[2], "os", device.getOs() }, { device.getLid(), hbaseFamily[2], "osv", device.getOsv()} }; hu.writeMultRow(hbaseTable, s); logger.info(" hbase util end " ); temp = logBufferedReader.readLine(); } } catch (Exception e) { logger.error(" DeviceReadThread error " ); e.printStackTrace(); } finally { try { logBufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } try { logReader.close(); } catch (IOException e) { e.printStackTrace(); } } } }
import java.io.File; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cmcc.aoi.util.FileUtil; public abstract class BaseRunnabler implements Runnable{ protected static Logger logger = LoggerFactory.getLogger(BaseRunnabler.class); String sourceFileName=""; // 读取文件路径 String hbaseTable=""; // hbase 表名 String [] hbaseFamily=null; // 行列簇名 String sourceFilePath ; String archivelogsPath ; boolean isDebug; public BaseRunnabler(String sourceFileName,String hbaseTable,String [] hbaseFamily ,String sourceFilePath,String archivelogsPath,boolean isDebug ){ this.hbaseTable=hbaseTable; this.hbaseFamily = hbaseFamily; this.sourceFileName=sourceFileName; this.sourceFilePath = sourceFilePath; this.archivelogsPath = archivelogsPath; this.isDebug = isDebug; } @Override public void run() { try{ IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily); hu.createTable(hbaseTable,hbaseFamily ); File file=new File(sourceFilePath); File[] tempFileList = file.listFiles(); Arrays.sort(tempFileList); for (File tempFile: tempFileList) { if (tempFile.isFile() && tempFile.getName().contains(sourceFileName +".") ) { try{ try{ processFile(hu); }catch (Exception e) { logger.error("readfile error ,must continue to protect to read other file "); continue; } removeFile(tempFile); }catch (Exception e2) { logger.error(" one file has an error ,other file must continue to do this task "); } } } }catch (Exception e) { e.printStackTrace(); } } public abstract void processFile(IOperator hu) throws Exception; private void removeFile(File file) { if (isDebug) { File path = new File(archivelogsPath); if (!path.exists()) { path.mkdirs(); } FileUtil.moveFile(file, new File(archivelogsPath,file.getName())); logger.info("remove file :" + file.getName()); }else{ file.delete(); logger.info("delete file :" + file.getName()); } } }
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
hadoop集群多节点安装详解
经常使用工具自动构建大规模集群环境,小环境也有10几台的机器,虽然自动部署很省事,但自动构建的背后那些机器自动完成的工作让我们疏忽了,特别是要自己构建一个小集群用于了解搭建细节和原理还是很有帮助的,今天为复习和了解下hadoop各进程间协调运行的原理,搭建了一个3节点的机器,并记录自己的搭建过程。 一 搭建集群 基本环境配置 IPHost部署进程 192.168.0.110elephantnamenode datanode nodemanager 192.168.0.110tigernodemanager datanode 192.168.0.110horseresourcemanager datanode nodemanager jobhistoryserver 1.1安装CDH5 yum 源 下载cdh5包 Wget http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/cloudera-cdh5.repo mv cloudera-cdh5.repo /etc/yum.repo.d 1.2在各节点安装对应组件 1.安装nam...
- 下一篇
Haodoop RPC解析
Haodoop RPC解析.pdf 1.前言 1 2.HadoopRPC 1 2.1.总体结构 1 2.1.1.RPCInterface 1 2.1.2.RPCServer 1 2.1.3.RPCClient 1 2.2.RPCInterface 2 2.2.1.getServer 2 2.2.2.getProxy 3 2.3.RPCServer 4 2.3.1.RPCServer结构 4 2.3.1.1.Server 4 2.3.1.2.RPC.Server 4 2.3.1.3.Server.Listener 4 2.3.1.4.Server.Handler 5 2.3.1.5.Server.Responder 5 2.3.1.6.Server.Connection 5 2.3.1.7.Server.Call 5 2.3.2.RPCServer主要流程 5 2.3.2.1.接收Call调用 5 2.3.2.2.处理Call调用 6 2.4.RPCClient 8 2.4.1.RPCClient结构 8 2.4.1.1.Client 8 2.4.1.2.Client.Connectio...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- 设置Eclipse缩进为4个空格,增强代码规范
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程