数据采集 ETL 工具 bboss-datatran v6.7.7 发布

数据采集 ETL 工具 bboss-datatran v6.7.7 发布,支持 Elasticsearch 8 以及其他 Elasticsearch 低版本和 Opensearch 之间数据同步。

  1. 新增轻量级但功能强大的大数据指标分析计算模块,可以非常方便地实现基于时间窗口的多种维度的实时指标计算和离线指标计算功能,适用于有限维度指标key和无限维度指标key,同时可以非常方便地将指标分析计算结果存储到各种数据库,以极低成本快速构建企业级大数据分析应用,导入以下包即可:
<dependency>
  <groupId>com.bbossgroups.plugins</groupId>
  <artifactId>bboss-datatran-metrics</artifactId>
  <version>6.7.7</version>
</dependency>

使用案例

https://gitee.com/bboss/bboss-elastic-tran/tree/master/bboss-datatran-core/src/test/java/org/frameworkset/tran/metrics

  1. 增量采集状态表increament_tab结构调整,增加以下字段:

    jobType varchar(500),用于保存输入插件类型,不能为空,不同的插件类型对应的值见下面的表格说明,避免不同类型作业加载增量状态时,互相干扰;

    jobId varchar(500),用于保存外部设置的作业id,可为空,相同的进程内启动多个同类型的输入插件的作业时,必须指定每个作业的jobId,避免各个作业加载增量状态时,互相干扰

插件类型 jobType jobId
HttpInputDataTranPlugin HttpInputDataTranPlugin
DBInputDataTranPlugin DBInputDataTranPlugin
ElasticsearchInputDataTranPlugin ElasticsearchInputDataTranPlugin
FileInputDataTranPlugin FileInputDataTranPlugin
HBaseInputDatatranPlugin HBaseInputDatatranPlugin
Kafka2InputDatatranPlugin Kafka2InputDatatranPlugin
MongoDBInputDatatranPlugin MongoDBInputDatatranPlugin

升级注意事项:升级6.7.7前,需要手动增加jobType和jobId两个字段,并修改increament_tab表中的状态记录,根据作业输入插件类型,填写正确的jobType,然后再启动作业,这样作业才能继续正常工作。

  1. 优化kafka输出插件任务状态记录管理功能,采用指标分析Metrics对数据发送情况进行聚合统计,按照指定的时间窗口进行聚合计算后,执行回调任务处理success方法,任务taskMetrics为聚合计算后的统计信息,可以通过开关控制是否进行预聚合功能:
kafkaOutputConfig.setEnableMetricsAgg(true);//启用预聚合功能
kafkaOutputConfig.setMetricsAggWindow(60);//指定统计时间窗口,单位:秒,默认值60秒

4. 优化kafka输入插件拦截器功能:定时记录统计插件消费kafka数据记录情况,并调用任务拦截器的aftercall方法输出统计jobMetrics信息,可以指定统计时间间隔:

kafka2InputConfig.setMetricsInterval(300 * 1000L);//300秒做一次任务拦截调用,默认值

5. 部分插件增加字段映射功能,涉及插件:日志采集插件、excel采集插件、生成日志/excel文件插件、kafka输入插件

文件采集插件字段映射配置示例:

FileInputConfig fileInputConfig = new FileInputConfig();
_fileInputConfig = fileInputConfig;
FileConfig fileConfig = new FileConfig();
    fileConfig.setFieldSplit(";");//指定日志记录字段分割符
  //指定字段映射配置
    fileConfig.addDateCellMapping(0, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());

    fileConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
    fileConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());

kafka映射配置示例:

Kafka2InputConfig kafka2InputConfig = new Kafka2InputConfig();

    kafka2InputConfig.setFieldSplit(";");//指定kafka记录字段分割符
  //指定字段映射配置
    kafka2InputConfig.addDateCellMapping(0, //记录切割得到的字段列表位置索引,从0开始
                excelCellMapping.getFieldName(), //映射的字段名称
                                          cellType, //字段值类型
                                          excelCellMapping.getDefaultValue(), //字段默认值
                                         excelCellMapping.getDataFormat());//字段格式:日期格式或者数字格式

    kafka2InputConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
    kafka2InputConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());

cellType取值范围:

public static final int CELL_BOOLEAN = 5;
public static final int CELL_DATE = 3;

public static final int CELL_NUMBER = 2;
public static final int CELL_NUMBER_INTEGER = 6;
public static final int CELL_NUMBER_LONG = 7;
public static final int CELL_NUMBER_FLOAT = 8;
public static final int CELL_NUMBER_SHORT = 9;
public static final int CELL_STRING = 1;

6. 增加全局JobContext,用于存放在作业中使用的初始化数据

7. 作业拦截器、任务拦截器异常方法参数Exception类型调整为Throwable类型

更多变更,参考提交记录:https://gitee.com/bboss/bboss-elastic-tran/commits/master

数据同步作业开发视频教程

https://www.bilibili.com/video/BV1xf4y1Z7xu

bboss 案例大全

https://esdoc.bbossgroups.com/#/bboss-datasyn-demo

Quick Start

https://esdoc.bbossgroups.com/#/quickstart

开发交流

https://www.bbossgroups.com/forum.html

 

优秀的个人博客,低调大师

微信关注我们

原文链接:https://www.oschina.net/news/222812

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。